Browse Source

Merge pull request #7 from fancycode/message_queue_list

Store pending messages in list.
pull/12/head
Simon Eisenmann 11 years ago
parent
commit
58fb2dac1b
  1. 31
      src/app/spreed-speakfreely-server/connection.go

31
src/app/spreed-speakfreely-server/connection.go

@ -59,7 +59,7 @@ type Connection struct { @@ -59,7 +59,7 @@ type Connection struct {
// Data handling.
condition *sync.Cond
queue []Buffer
queue list.List
mutex sync.Mutex
isClosed bool
@ -82,7 +82,6 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection { @@ -82,7 +82,6 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection {
RemoteAddr: remoteAddr,
}
c.condition = sync.NewCond(&c.mutex)
c.queue = make([]Buffer, 0)
return c
@ -95,7 +94,15 @@ func (c *Connection) close() { @@ -95,7 +94,15 @@ func (c *Connection) close() {
c.mutex.Lock()
c.User = nil
c.isClosed = true
c.queue = c.queue[:0]
for {
head := c.queue.Front()
if head == nil {
break
}
c.queue.Remove(head)
message := head.Value.(Buffer)
message.Decref()
}
c.condition.Signal()
c.mutex.Unlock()
}
@ -206,12 +213,12 @@ func (c *Connection) send(message Buffer) { @@ -206,12 +213,12 @@ func (c *Connection) send(message Buffer) {
return
}
//fmt.Println("Outbound queue size", c.Idx, len(c.queue))
if len(c.queue) >= maxQueueSize {
log.Println("Outbound queue overflow", c.Idx, len(c.queue))
if c.queue.Len() >= maxQueueSize {
log.Println("Outbound queue overflow", c.Idx, c.queue.Len())
return
}
message.Incref()
c.queue = append(c.queue, message)
c.queue.PushBack(message)
c.condition.Signal()
}
@ -240,7 +247,7 @@ func (c *Connection) writePump() { @@ -240,7 +247,7 @@ func (c *Connection) writePump() {
c.mutex.Lock()
// Wait until something todo.
for !ping && !c.isClosed && len(c.queue) == 0 {
for !ping && !c.isClosed && c.queue.Len() == 0 {
// Wait on signal (this also unlocks while waiting, and locks again when got the signal).
c.condition.Wait()
}
@ -250,9 +257,13 @@ func (c *Connection) writePump() { @@ -250,9 +257,13 @@ func (c *Connection) writePump() {
goto cleanup
}
// Flush queue if something.
for len(c.queue) > 0 {
message := c.queue[0]
c.queue = c.queue[1:]
for {
head := c.queue.Front()
if head == nil {
break
}
c.queue.Remove(head)
message := head.Value.(Buffer)
if ping {
// Send ping.
ping = false

Loading…
Cancel
Save