diff --git a/src/app/spreed-speakfreely-server/connection.go b/src/app/spreed-speakfreely-server/connection.go index 68b4639c..0e9d211e 100644 --- a/src/app/spreed-speakfreely-server/connection.go +++ b/src/app/spreed-speakfreely-server/connection.go @@ -59,7 +59,7 @@ type Connection struct { // Data handling. condition *sync.Cond - queue []Buffer + queue list.List mutex sync.Mutex isClosed bool @@ -82,7 +82,6 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection { RemoteAddr: remoteAddr, } c.condition = sync.NewCond(&c.mutex) - c.queue = make([]Buffer, 0) return c @@ -95,7 +94,15 @@ func (c *Connection) close() { c.mutex.Lock() c.User = nil c.isClosed = true - c.queue = c.queue[:0] + for { + head := c.queue.Front() + if head == nil { + break + } + c.queue.Remove(head) + message := head.Value.(Buffer) + message.Decref() + } c.condition.Signal() c.mutex.Unlock() } @@ -206,12 +213,12 @@ func (c *Connection) send(message Buffer) { return } //fmt.Println("Outbound queue size", c.Idx, len(c.queue)) - if len(c.queue) >= maxQueueSize { - log.Println("Outbound queue overflow", c.Idx, len(c.queue)) + if c.queue.Len() >= maxQueueSize { + log.Println("Outbound queue overflow", c.Idx, c.queue.Len()) return } message.Incref() - c.queue = append(c.queue, message) + c.queue.PushBack(message) c.condition.Signal() } @@ -240,7 +247,7 @@ func (c *Connection) writePump() { c.mutex.Lock() // Wait until something todo. - for !ping && !c.isClosed && len(c.queue) == 0 { + for !ping && !c.isClosed && c.queue.Len() == 0 { // Wait on signal (this also unlocks while waiting, and locks again when got the signal). c.condition.Wait() } @@ -250,9 +257,13 @@ func (c *Connection) writePump() { goto cleanup } // Flush queue if something. - for len(c.queue) > 0 { - message := c.queue[0] - c.queue = c.queue[1:] + for { + head := c.queue.Front() + if head == nil { + break + } + c.queue.Remove(head) + message := head.Value.(Buffer) if ping { // Send ping. ping = false