Browse Source

Merge pull request #5 from fancycode/writepump_ping_goroutine

Writepump optimizations
pull/6/head
Simon Eisenmann 12 years ago
parent
commit
c0e15a58cd
  1. 45
      src/app/spreed-speakfreely-server/connection.go

45
src/app/spreed-speakfreely-server/connection.go

@ -153,11 +153,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
func (c *Connection) readPump() { func (c *Connection) readPump() {
defer func() {
c.unregister()
c.ws.Close()
}()
c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetPongHandler(func(string) error {
@ -198,6 +193,9 @@ func (c *Connection) readPump() {
c.h.buffers.Push(message) c.h.buffers.Push(message)
} }
} }
c.unregister()
c.ws.Close()
} }
// Write message to outbound queue. // Write message to outbound queue.
@ -221,24 +219,11 @@ func (c *Connection) send(message []byte) {
// writePump pumps messages from the queue to the websocket connection. // writePump pumps messages from the queue to the websocket connection.
func (c *Connection) writePump() { func (c *Connection) writePump() {
closer := make(chan bool) var timer *time.Timer
ticker := time.NewTicker(pingPeriod)
ping := false ping := false
defer func() { // Spawn a timer to emit websocket pings.
//fmt.Println("writePump done") timer = time.AfterFunc(pingPeriod, func() {
closer <- true
ticker.Stop()
c.ws.Close()
}()
// Spawn a new go routine to emit websocket pings.
go func() {
for {
select {
case <-closer:
return
case <-ticker.C:
c.mutex.Lock() c.mutex.Lock()
if c.isClosed { if c.isClosed {
c.mutex.Unlock() c.mutex.Unlock()
@ -247,9 +232,8 @@ func (c *Connection) writePump() {
ping = true ping = true
c.condition.Signal() c.condition.Signal()
c.mutex.Unlock() c.mutex.Unlock()
} timer.Reset(pingPeriod)
} })
}()
// Wait for actions. // Wait for actions.
for { for {
@ -263,7 +247,7 @@ func (c *Connection) writePump() {
// Fast exit if in closed state. // Fast exit if in closed state.
if c.isClosed { if c.isClosed {
c.mutex.Unlock() c.mutex.Unlock()
return goto cleanup
} }
// Flush queue if something. // Flush queue if something.
for len(c.queue) > 0 { for len(c.queue) > 0 {
@ -275,14 +259,14 @@ func (c *Connection) writePump() {
c.mutex.Unlock() c.mutex.Unlock()
if err := c.ping(); err != nil { if err := c.ping(); err != nil {
log.Println("Error while sending ping", c.Idx, err) log.Println("Error while sending ping", c.Idx, err)
return goto cleanup
} }
} else { } else {
c.mutex.Unlock() c.mutex.Unlock()
} }
if err := c.write(websocket.TextMessage, message); err != nil { if err := c.write(websocket.TextMessage, message); err != nil {
log.Println("Error while writing", c.Idx, err) log.Println("Error while writing", c.Idx, err)
return goto cleanup
} }
c.mutex.Lock() c.mutex.Lock()
} }
@ -292,7 +276,7 @@ func (c *Connection) writePump() {
c.mutex.Unlock() c.mutex.Unlock()
if err := c.ping(); err != nil { if err := c.ping(); err != nil {
log.Println("Error while sending ping", c.Idx, err) log.Println("Error while sending ping", c.Idx, err)
return goto cleanup
} }
} else { } else {
// Final unlock. // Final unlock.
@ -300,6 +284,11 @@ func (c *Connection) writePump() {
} }
} }
cleanup:
//fmt.Println("writePump done")
timer.Stop()
c.ws.Close()
} }
// Write ping message. // Write ping message.

Loading…
Cancel
Save