Browse Source

Store pending messages in list.

This avoids copying elements around as they are added to the back and fetched from the front.
pull/7/head
Joachim Bauch 12 years ago
parent
commit
fe3ec466c8
  1. 31
      src/app/spreed-speakfreely-server/connection.go

31
src/app/spreed-speakfreely-server/connection.go

@ -59,7 +59,7 @@ type Connection struct {
// Data handling. // Data handling.
condition *sync.Cond condition *sync.Cond
queue []Buffer queue list.List
mutex sync.Mutex mutex sync.Mutex
isClosed bool isClosed bool
@ -82,7 +82,6 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection {
RemoteAddr: remoteAddr, RemoteAddr: remoteAddr,
} }
c.condition = sync.NewCond(&c.mutex) c.condition = sync.NewCond(&c.mutex)
c.queue = make([]Buffer, 0)
return c return c
@ -95,7 +94,15 @@ func (c *Connection) close() {
c.mutex.Lock() c.mutex.Lock()
c.User = nil c.User = nil
c.isClosed = true c.isClosed = true
c.queue = c.queue[:0] for {
head := c.queue.Front()
if head == nil {
break
}
c.queue.Remove(head)
message := head.Value.(Buffer)
message.Decref()
}
c.condition.Signal() c.condition.Signal()
c.mutex.Unlock() c.mutex.Unlock()
} }
@ -206,12 +213,12 @@ func (c *Connection) send(message Buffer) {
return return
} }
//fmt.Println("Outbound queue size", c.Idx, len(c.queue)) //fmt.Println("Outbound queue size", c.Idx, len(c.queue))
if len(c.queue) >= maxQueueSize { if c.queue.Len() >= maxQueueSize {
log.Println("Outbound queue overflow", c.Idx, len(c.queue)) log.Println("Outbound queue overflow", c.Idx, c.queue.Len())
return return
} }
message.Incref() message.Incref()
c.queue = append(c.queue, message) c.queue.PushBack(message)
c.condition.Signal() c.condition.Signal()
} }
@ -240,7 +247,7 @@ func (c *Connection) writePump() {
c.mutex.Lock() c.mutex.Lock()
// Wait until something todo. // Wait until something todo.
for !ping && !c.isClosed && len(c.queue) == 0 { for !ping && !c.isClosed && c.queue.Len() == 0 {
// Wait on signal (this also unlocks while waiting, and locks again when got the signal). // Wait on signal (this also unlocks while waiting, and locks again when got the signal).
c.condition.Wait() c.condition.Wait()
} }
@ -250,9 +257,13 @@ func (c *Connection) writePump() {
goto cleanup goto cleanup
} }
// Flush queue if something. // Flush queue if something.
for len(c.queue) > 0 { for {
message := c.queue[0] head := c.queue.Front()
c.queue = c.queue[1:] if head == nil {
break
}
c.queue.Remove(head)
message := head.Value.(Buffer)
if ping { if ping {
// Send ping. // Send ping.
ping = false ping = false

Loading…
Cancel
Save