diff --git a/src/app/spreed-speakfreely-server/buffercache.go b/src/app/spreed-speakfreely-server/buffercache.go index b5543f67..1d1190b6 100644 --- a/src/app/spreed-speakfreely-server/buffercache.go +++ b/src/app/spreed-speakfreely-server/buffercache.go @@ -22,24 +22,100 @@ package main import ( "bytes" + "io" + "sync/atomic" ) +type Buffer interface { + // bytes.Buffer + Reset() + Bytes() []byte + ReadFrom(r io.Reader) (n int64, err error) + // io.Writer + Write(p []byte) (n int, err error) + // provide direct access + GetBuffer() *bytes.Buffer + // refcounting + Incref() + Decref() +} + type BufferCache interface { - Push(buffer *bytes.Buffer) + New() Buffer + + Wrap(data []byte) Buffer +} + +type cachedBuffer struct { + bytes.Buffer + refcnt int32 + cache *bufferCache +} + +func (b *cachedBuffer) GetBuffer() *bytes.Buffer { + return &b.Buffer +} - Pop() *bytes.Buffer +func (b *cachedBuffer) Incref() { + atomic.AddInt32(&b.refcnt, 1) +} + +func (b *cachedBuffer) Decref() { + if atomic.AddInt32(&b.refcnt, -1) == 0 { + b.cache.push(b) + } +} + +type directBuffer struct { + buf *bytes.Buffer + refcnt int32 + cache *bufferCache +} + +func (b *directBuffer) Reset() { + b.buf.Reset() +} + +func (b *directBuffer) Bytes() []byte { + return b.buf.Bytes() +} + +func (b *directBuffer) ReadFrom(r io.Reader) (n int64, err error) { + return b.buf.ReadFrom(r) +} + +func (b *directBuffer) Write(p []byte) (n int, err error) { + return b.buf.Write(p) +} + +func (b *directBuffer) GetBuffer() *bytes.Buffer { + return b.buf +} + +func (b *directBuffer) Incref() { + atomic.AddInt32(&b.refcnt, 1) +} + +func (b *directBuffer) Decref() { + if atomic.AddInt32(&b.refcnt, -1) == 0 { + b.cache.push(b) + } } type bufferCache struct { - buffers chan *bytes.Buffer + buffers chan Buffer initialSize int } func NewBufferCache(count int, initialSize int) BufferCache { - return &bufferCache{buffers: make(chan *bytes.Buffer, count), initialSize: initialSize} + return &bufferCache{buffers: make(chan Buffer, count), initialSize: initialSize} } -func (cache *bufferCache) Push(buffer *bytes.Buffer) { +func (cache *bufferCache) push(buffer Buffer) { + if buffer, ok := buffer.(*directBuffer); ok { + buffer.Reset() + return + } buffer.Reset() select { case cache.buffers <- buffer: @@ -51,15 +127,21 @@ func (cache *bufferCache) Push(buffer *bytes.Buffer) { } } -func (cache *bufferCache) Pop() *bytes.Buffer { - var buffer *bytes.Buffer +func (cache *bufferCache) New() Buffer { + var buffer Buffer select { case buffer = <-cache.buffers: // reuse existing buffer + buffer.Incref() break default: - buffer = bytes.NewBuffer(make([]byte, 0, cache.initialSize)) + buffer = &cachedBuffer{refcnt: 1, cache: cache} + buffer.GetBuffer().Grow(cache.initialSize) break } return buffer } + +func (cache *bufferCache) Wrap(data []byte) Buffer { + return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)} +} diff --git a/src/app/spreed-speakfreely-server/connection.go b/src/app/spreed-speakfreely-server/connection.go index 595c4f90..68b4639c 100644 --- a/src/app/spreed-speakfreely-server/connection.go +++ b/src/app/spreed-speakfreely-server/connection.go @@ -59,7 +59,7 @@ type Connection struct { // Data handling. condition *sync.Cond - queue [][]byte + queue []Buffer mutex sync.Mutex isClosed bool @@ -82,7 +82,7 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection { RemoteAddr: remoteAddr, } c.condition = sync.NewCond(&c.mutex) - c.queue = make([][]byte, 0) + c.queue = make([]Buffer, 0) return c @@ -132,7 +132,7 @@ func (c *Connection) unregister() { c.h.unregisterHandler(c) } -func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { +func (c *Connection) readAll(dest Buffer, r io.Reader) error { var err error defer func() { e := recover() @@ -146,7 +146,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { } }() - dest.Reset() _, err = dest.ReadFrom(r) return err } @@ -172,10 +171,10 @@ func (c *Connection) readPump() { } switch op { case websocket.TextMessage: - message := c.h.buffers.Pop() + message := c.h.buffers.New() err = c.readAll(message, r) if err != nil { - c.h.buffers.Push(message) + message.Decref() break } now := time.Now() @@ -189,8 +188,8 @@ func (c *Connection) readPump() { } } times.PushBack(now) - c.h.server.OnText(c, message.Bytes()) - c.h.buffers.Push(message) + c.h.server.OnText(c, message) + message.Decref() } } @@ -199,7 +198,7 @@ func (c *Connection) readPump() { } // Write message to outbound queue. -func (c *Connection) send(message []byte) { +func (c *Connection) send(message Buffer) { c.mutex.Lock() defer c.mutex.Unlock() @@ -211,6 +210,7 @@ func (c *Connection) send(message []byte) { log.Println("Outbound queue overflow", c.Idx, len(c.queue)) return } + message.Incref() c.queue = append(c.queue, message) c.condition.Signal() @@ -259,15 +259,18 @@ func (c *Connection) writePump() { c.mutex.Unlock() if err := c.ping(); err != nil { log.Println("Error while sending ping", c.Idx, err) + message.Decref() goto cleanup } } else { c.mutex.Unlock() } - if err := c.write(websocket.TextMessage, message); err != nil { + if err := c.write(websocket.TextMessage, message.Bytes()); err != nil { log.Println("Error while writing", c.Idx, err) + message.Decref() goto cleanup } + message.Decref() c.mutex.Lock() } if ping { diff --git a/src/app/spreed-speakfreely-server/hub.go b/src/app/spreed-speakfreely-server/hub.go index f8ae496d..36237ba4 100644 --- a/src/app/spreed-speakfreely-server/hub.go +++ b/src/app/spreed-speakfreely-server/hub.go @@ -42,7 +42,7 @@ const ( type MessageRequest struct { From string To string - Message []byte + Message Buffer Id string } @@ -289,12 +289,16 @@ func (h *Hub) unicastHandler(m *MessageRequest) { func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) { - aliveJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: alive}) + aliveJson := h.buffers.New() + encoder := json.NewEncoder(aliveJson) + err := encoder.Encode(&DataOutgoing{From: c.Id, Data: alive}) if err != nil { log.Println("Alive error while encoding JSON", err) + aliveJson.Decref() return } c.send(aliveJson) + aliveJson.Decref() } diff --git a/src/app/spreed-speakfreely-server/roomworker.go b/src/app/spreed-speakfreely-server/roomworker.go index 741acdb3..f2477397 100644 --- a/src/app/spreed-speakfreely-server/roomworker.go +++ b/src/app/spreed-speakfreely-server/roomworker.go @@ -164,12 +164,16 @@ func (r *RoomWorker) usersHandler(c *Connection) { } } users.Users = ul - usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users}) + usersJson := c.h.buffers.New() + encoder := json.NewEncoder(usersJson) + err := encoder.Encode(&DataOutgoing{From: c.Id, Data: users}) if err != nil { log.Println("Users error while encoding JSON", err) + usersJson.Decref() return } c.send(usersJson) + usersJson.Decref() } @@ -189,9 +193,11 @@ func (r *RoomWorker) broadcastHandler(m *MessageRequest) { } //fmt.Printf("%s\n", m.Message) ec.send(m.Message) + m.Message.Decref() } } + m.Message.Incref() r.Run(worker) } diff --git a/src/app/spreed-speakfreely-server/server.go b/src/app/spreed-speakfreely-server/server.go index 4f195c3b..ae677bfe 100644 --- a/src/app/spreed-speakfreely-server/server.go +++ b/src/app/spreed-speakfreely-server/server.go @@ -53,11 +53,11 @@ func (s *Server) OnUnregister(c *Connection) { } } -func (s *Server) OnText(c *Connection, b []byte) { +func (s *Server) OnText(c *Connection, b Buffer) { //log.Printf("OnText from %d: %s\n", c.id, b) var msg DataIncoming - err := json.Unmarshal(b, &msg) + err := json.Unmarshal(b.Bytes(), &msg) if err != nil { log.Println("OnText error while decoding JSON", err) log.Printf("JSON:\n%s\n", b) @@ -146,20 +146,18 @@ func (s *Server) OnText(c *Connection, b []byte) { func (s *Server) Unicast(c *Connection, to string, m interface{}) { - b := c.h.buffers.Pop() + b := c.h.buffers.New() encoder := json.NewEncoder(b) err := encoder.Encode(&DataOutgoing{From: c.Id, To: to, Data: m}) - if err != nil { - c.h.buffers.Push(b) + b.Decref() log.Println("Unicast error while encoding JSON", err) return } - var msg = &MessageRequest{From: c.Id, To: to, Message: b.Bytes()} + var msg = &MessageRequest{From: c.Id, To: to, Message: b} c.h.unicastHandler(msg) - c.h.buffers.Push(b) - + b.Decref() } func (s *Server) Alive(c *Connection, alive *DataAlive) { @@ -183,16 +181,18 @@ func (s *Server) Broadcast(c *Connection, m interface{}) { return } + buffer := c.h.buffers.Wrap(b) if c.h.isGlobalRoomid(c.Roomid) { c.h.RunForAllRooms(func(room *RoomWorker) { - var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id} + var msg = &MessageRequest{From: c.Id, Message: buffer, Id: room.Id} room.broadcastHandler(msg) }) } else { - var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid} + var msg = &MessageRequest{From: c.Id, Message: buffer, Id: c.Roomid} room := c.h.GetRoom(c.Roomid) room.broadcastHandler(msg) } + buffer.Decref() }