|
|
|
@ -59,7 +59,7 @@ type Connection struct {
@@ -59,7 +59,7 @@ type Connection struct {
|
|
|
|
|
|
|
|
|
|
// Data handling.
|
|
|
|
|
condition *sync.Cond |
|
|
|
|
queue [][]byte |
|
|
|
|
queue []Buffer |
|
|
|
|
mutex sync.Mutex |
|
|
|
|
isClosed bool |
|
|
|
|
|
|
|
|
@ -82,7 +82,7 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection {
@@ -82,7 +82,7 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection {
|
|
|
|
|
RemoteAddr: remoteAddr, |
|
|
|
|
} |
|
|
|
|
c.condition = sync.NewCond(&c.mutex) |
|
|
|
|
c.queue = make([][]byte, 0) |
|
|
|
|
c.queue = make([]Buffer, 0) |
|
|
|
|
|
|
|
|
|
return c |
|
|
|
|
|
|
|
|
@ -132,7 +132,7 @@ func (c *Connection) unregister() {
@@ -132,7 +132,7 @@ func (c *Connection) unregister() {
|
|
|
|
|
c.h.unregisterHandler(c) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { |
|
|
|
|
func (c *Connection) readAll(dest Buffer, r io.Reader) error { |
|
|
|
|
var err error |
|
|
|
|
defer func() { |
|
|
|
|
e := recover() |
|
|
|
@ -146,7 +146,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
@@ -146,7 +146,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
|
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
dest.Reset() |
|
|
|
|
_, err = dest.ReadFrom(r) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
@ -172,10 +171,10 @@ func (c *Connection) readPump() {
@@ -172,10 +171,10 @@ func (c *Connection) readPump() {
|
|
|
|
|
} |
|
|
|
|
switch op { |
|
|
|
|
case websocket.TextMessage: |
|
|
|
|
message := c.h.buffers.Pop() |
|
|
|
|
message := c.h.buffers.New() |
|
|
|
|
err = c.readAll(message, r) |
|
|
|
|
if err != nil { |
|
|
|
|
c.h.buffers.Push(message) |
|
|
|
|
message.Decref() |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
now := time.Now() |
|
|
|
@ -189,8 +188,8 @@ func (c *Connection) readPump() {
@@ -189,8 +188,8 @@ func (c *Connection) readPump() {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
times.PushBack(now) |
|
|
|
|
c.h.server.OnText(c, message.Bytes()) |
|
|
|
|
c.h.buffers.Push(message) |
|
|
|
|
c.h.server.OnText(c, message) |
|
|
|
|
message.Decref() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -199,7 +198,7 @@ func (c *Connection) readPump() {
@@ -199,7 +198,7 @@ func (c *Connection) readPump() {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Write message to outbound queue.
|
|
|
|
|
func (c *Connection) send(message []byte) { |
|
|
|
|
func (c *Connection) send(message Buffer) { |
|
|
|
|
|
|
|
|
|
c.mutex.Lock() |
|
|
|
|
defer c.mutex.Unlock() |
|
|
|
@ -211,6 +210,7 @@ func (c *Connection) send(message []byte) {
@@ -211,6 +210,7 @@ func (c *Connection) send(message []byte) {
|
|
|
|
|
log.Println("Outbound queue overflow", c.Idx, len(c.queue)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
message.Incref() |
|
|
|
|
c.queue = append(c.queue, message) |
|
|
|
|
c.condition.Signal() |
|
|
|
|
|
|
|
|
@ -259,15 +259,18 @@ func (c *Connection) writePump() {
@@ -259,15 +259,18 @@ func (c *Connection) writePump() {
|
|
|
|
|
c.mutex.Unlock() |
|
|
|
|
if err := c.ping(); err != nil { |
|
|
|
|
log.Println("Error while sending ping", c.Idx, err) |
|
|
|
|
message.Decref() |
|
|
|
|
goto cleanup |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
c.mutex.Unlock() |
|
|
|
|
} |
|
|
|
|
if err := c.write(websocket.TextMessage, message); err != nil { |
|
|
|
|
if err := c.write(websocket.TextMessage, message.Bytes()); err != nil { |
|
|
|
|
log.Println("Error while writing", c.Idx, err) |
|
|
|
|
message.Decref() |
|
|
|
|
goto cleanup |
|
|
|
|
} |
|
|
|
|
message.Decref() |
|
|
|
|
c.mutex.Lock() |
|
|
|
|
} |
|
|
|
|
if ping { |
|
|
|
|