diff --git a/src/app/spreed-speakfreely-server/connection.go b/src/app/spreed-speakfreely-server/connection.go index 2de439a7..595c4f90 100644 --- a/src/app/spreed-speakfreely-server/connection.go +++ b/src/app/spreed-speakfreely-server/connection.go @@ -153,11 +153,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { // readPump pumps messages from the websocket connection to the hub. func (c *Connection) readPump() { - defer func() { - c.unregister() - c.ws.Close() - }() - c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetPongHandler(func(string) error { @@ -198,6 +193,9 @@ func (c *Connection) readPump() { c.h.buffers.Push(message) } } + + c.unregister() + c.ws.Close() } // Write message to outbound queue. @@ -221,35 +219,21 @@ func (c *Connection) send(message []byte) { // writePump pumps messages from the queue to the websocket connection. func (c *Connection) writePump() { - closer := make(chan bool) - ticker := time.NewTicker(pingPeriod) + var timer *time.Timer ping := false - defer func() { - //fmt.Println("writePump done") - closer <- true - ticker.Stop() - c.ws.Close() - }() - - // Spawn a new go routine to emit websocket pings. - go func() { - for { - select { - case <-closer: - return - case <-ticker.C: - c.mutex.Lock() - if c.isClosed { - c.mutex.Unlock() - return - } - ping = true - c.condition.Signal() - c.mutex.Unlock() - } + // Spawn a timer to emit websocket pings. + timer = time.AfterFunc(pingPeriod, func() { + c.mutex.Lock() + if c.isClosed { + c.mutex.Unlock() + return } - }() + ping = true + c.condition.Signal() + c.mutex.Unlock() + timer.Reset(pingPeriod) + }) // Wait for actions. for { @@ -263,7 +247,7 @@ func (c *Connection) writePump() { // Fast exit if in closed state. if c.isClosed { c.mutex.Unlock() - return + goto cleanup } // Flush queue if something. for len(c.queue) > 0 { @@ -275,14 +259,14 @@ func (c *Connection) writePump() { c.mutex.Unlock() if err := c.ping(); err != nil { log.Println("Error while sending ping", c.Idx, err) - return + goto cleanup } } else { c.mutex.Unlock() } if err := c.write(websocket.TextMessage, message); err != nil { log.Println("Error while writing", c.Idx, err) - return + goto cleanup } c.mutex.Lock() } @@ -292,7 +276,7 @@ func (c *Connection) writePump() { c.mutex.Unlock() if err := c.ping(); err != nil { log.Println("Error while sending ping", c.Idx, err) - return + goto cleanup } } else { // Final unlock. @@ -300,6 +284,11 @@ func (c *Connection) writePump() { } } + +cleanup: + //fmt.Println("writePump done") + timer.Stop() + c.ws.Close() } // Write ping message.