Browse Source

Use refcounted buffers for IO.

Need to use buffers multiple times when broadcasting.
pull/6/head
Joachim Bauch 12 years ago
parent
commit
f185864ca9
  1. 98
      src/app/spreed-speakfreely-server/buffercache.go
  2. 23
      src/app/spreed-speakfreely-server/connection.go
  3. 8
      src/app/spreed-speakfreely-server/hub.go
  4. 8
      src/app/spreed-speakfreely-server/roomworker.go
  5. 20
      src/app/spreed-speakfreely-server/server.go

98
src/app/spreed-speakfreely-server/buffercache.go

@ -22,24 +22,100 @@ package main @@ -22,24 +22,100 @@ package main
import (
"bytes"
"io"
"sync/atomic"
)
type Buffer interface {
// bytes.Buffer
Reset()
Bytes() []byte
ReadFrom(r io.Reader) (n int64, err error)
// io.Writer
Write(p []byte) (n int, err error)
// provide direct access
GetBuffer() *bytes.Buffer
// refcounting
Incref()
Decref()
}
type BufferCache interface {
Push(buffer *bytes.Buffer)
New() Buffer
Wrap(data []byte) Buffer
}
type cachedBuffer struct {
bytes.Buffer
refcnt int32
cache *bufferCache
}
func (b *cachedBuffer) GetBuffer() *bytes.Buffer {
return &b.Buffer
}
Pop() *bytes.Buffer
func (b *cachedBuffer) Incref() {
atomic.AddInt32(&b.refcnt, 1)
}
func (b *cachedBuffer) Decref() {
if atomic.AddInt32(&b.refcnt, -1) == 0 {
b.cache.push(b)
}
}
type directBuffer struct {
buf *bytes.Buffer
refcnt int32
cache *bufferCache
}
func (b *directBuffer) Reset() {
b.buf.Reset()
}
func (b *directBuffer) Bytes() []byte {
return b.buf.Bytes()
}
func (b *directBuffer) ReadFrom(r io.Reader) (n int64, err error) {
return b.buf.ReadFrom(r)
}
func (b *directBuffer) Write(p []byte) (n int, err error) {
return b.buf.Write(p)
}
func (b *directBuffer) GetBuffer() *bytes.Buffer {
return b.buf
}
func (b *directBuffer) Incref() {
atomic.AddInt32(&b.refcnt, 1)
}
func (b *directBuffer) Decref() {
if atomic.AddInt32(&b.refcnt, -1) == 0 {
b.cache.push(b)
}
}
type bufferCache struct {
buffers chan *bytes.Buffer
buffers chan Buffer
initialSize int
}
func NewBufferCache(count int, initialSize int) BufferCache {
return &bufferCache{buffers: make(chan *bytes.Buffer, count), initialSize: initialSize}
return &bufferCache{buffers: make(chan Buffer, count), initialSize: initialSize}
}
func (cache *bufferCache) Push(buffer *bytes.Buffer) {
func (cache *bufferCache) push(buffer Buffer) {
if buffer, ok := buffer.(*directBuffer); ok {
buffer.Reset()
return
}
buffer.Reset()
select {
case cache.buffers <- buffer:
@ -51,15 +127,21 @@ func (cache *bufferCache) Push(buffer *bytes.Buffer) { @@ -51,15 +127,21 @@ func (cache *bufferCache) Push(buffer *bytes.Buffer) {
}
}
func (cache *bufferCache) Pop() *bytes.Buffer {
var buffer *bytes.Buffer
func (cache *bufferCache) New() Buffer {
var buffer Buffer
select {
case buffer = <-cache.buffers:
// reuse existing buffer
buffer.Incref()
break
default:
buffer = bytes.NewBuffer(make([]byte, 0, cache.initialSize))
buffer = &cachedBuffer{refcnt: 1, cache: cache}
buffer.GetBuffer().Grow(cache.initialSize)
break
}
return buffer
}
func (cache *bufferCache) Wrap(data []byte) Buffer {
return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)}
}

23
src/app/spreed-speakfreely-server/connection.go

@ -59,7 +59,7 @@ type Connection struct { @@ -59,7 +59,7 @@ type Connection struct {
// Data handling.
condition *sync.Cond
queue [][]byte
queue []Buffer
mutex sync.Mutex
isClosed bool
@ -82,7 +82,7 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection { @@ -82,7 +82,7 @@ func NewConnection(h *Hub, ws *websocket.Conn, remoteAddr string) *Connection {
RemoteAddr: remoteAddr,
}
c.condition = sync.NewCond(&c.mutex)
c.queue = make([][]byte, 0)
c.queue = make([]Buffer, 0)
return c
@ -132,7 +132,7 @@ func (c *Connection) unregister() { @@ -132,7 +132,7 @@ func (c *Connection) unregister() {
c.h.unregisterHandler(c)
}
func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
func (c *Connection) readAll(dest Buffer, r io.Reader) error {
var err error
defer func() {
e := recover()
@ -146,7 +146,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { @@ -146,7 +146,6 @@ func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
}
}()
dest.Reset()
_, err = dest.ReadFrom(r)
return err
}
@ -172,10 +171,10 @@ func (c *Connection) readPump() { @@ -172,10 +171,10 @@ func (c *Connection) readPump() {
}
switch op {
case websocket.TextMessage:
message := c.h.buffers.Pop()
message := c.h.buffers.New()
err = c.readAll(message, r)
if err != nil {
c.h.buffers.Push(message)
message.Decref()
break
}
now := time.Now()
@ -189,8 +188,8 @@ func (c *Connection) readPump() { @@ -189,8 +188,8 @@ func (c *Connection) readPump() {
}
}
times.PushBack(now)
c.h.server.OnText(c, message.Bytes())
c.h.buffers.Push(message)
c.h.server.OnText(c, message)
message.Decref()
}
}
@ -199,7 +198,7 @@ func (c *Connection) readPump() { @@ -199,7 +198,7 @@ func (c *Connection) readPump() {
}
// Write message to outbound queue.
func (c *Connection) send(message []byte) {
func (c *Connection) send(message Buffer) {
c.mutex.Lock()
defer c.mutex.Unlock()
@ -211,6 +210,7 @@ func (c *Connection) send(message []byte) { @@ -211,6 +210,7 @@ func (c *Connection) send(message []byte) {
log.Println("Outbound queue overflow", c.Idx, len(c.queue))
return
}
message.Incref()
c.queue = append(c.queue, message)
c.condition.Signal()
@ -259,15 +259,18 @@ func (c *Connection) writePump() { @@ -259,15 +259,18 @@ func (c *Connection) writePump() {
c.mutex.Unlock()
if err := c.ping(); err != nil {
log.Println("Error while sending ping", c.Idx, err)
message.Decref()
goto cleanup
}
} else {
c.mutex.Unlock()
}
if err := c.write(websocket.TextMessage, message); err != nil {
if err := c.write(websocket.TextMessage, message.Bytes()); err != nil {
log.Println("Error while writing", c.Idx, err)
message.Decref()
goto cleanup
}
message.Decref()
c.mutex.Lock()
}
if ping {

8
src/app/spreed-speakfreely-server/hub.go

@ -42,7 +42,7 @@ const ( @@ -42,7 +42,7 @@ const (
type MessageRequest struct {
From string
To string
Message []byte
Message Buffer
Id string
}
@ -289,12 +289,16 @@ func (h *Hub) unicastHandler(m *MessageRequest) { @@ -289,12 +289,16 @@ func (h *Hub) unicastHandler(m *MessageRequest) {
func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) {
aliveJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: alive})
aliveJson := h.buffers.New()
encoder := json.NewEncoder(aliveJson)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: alive})
if err != nil {
log.Println("Alive error while encoding JSON", err)
aliveJson.Decref()
return
}
c.send(aliveJson)
aliveJson.Decref()
}

8
src/app/spreed-speakfreely-server/roomworker.go

@ -164,12 +164,16 @@ func (r *RoomWorker) usersHandler(c *Connection) { @@ -164,12 +164,16 @@ func (r *RoomWorker) usersHandler(c *Connection) {
}
}
users.Users = ul
usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users})
usersJson := c.h.buffers.New()
encoder := json.NewEncoder(usersJson)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: users})
if err != nil {
log.Println("Users error while encoding JSON", err)
usersJson.Decref()
return
}
c.send(usersJson)
usersJson.Decref()
}
@ -189,9 +193,11 @@ func (r *RoomWorker) broadcastHandler(m *MessageRequest) { @@ -189,9 +193,11 @@ func (r *RoomWorker) broadcastHandler(m *MessageRequest) {
}
//fmt.Printf("%s\n", m.Message)
ec.send(m.Message)
m.Message.Decref()
}
}
m.Message.Incref()
r.Run(worker)
}

20
src/app/spreed-speakfreely-server/server.go

@ -53,11 +53,11 @@ func (s *Server) OnUnregister(c *Connection) { @@ -53,11 +53,11 @@ func (s *Server) OnUnregister(c *Connection) {
}
}
func (s *Server) OnText(c *Connection, b []byte) {
func (s *Server) OnText(c *Connection, b Buffer) {
//log.Printf("OnText from %d: %s\n", c.id, b)
var msg DataIncoming
err := json.Unmarshal(b, &msg)
err := json.Unmarshal(b.Bytes(), &msg)
if err != nil {
log.Println("OnText error while decoding JSON", err)
log.Printf("JSON:\n%s\n", b)
@ -146,20 +146,18 @@ func (s *Server) OnText(c *Connection, b []byte) { @@ -146,20 +146,18 @@ func (s *Server) OnText(c *Connection, b []byte) {
func (s *Server) Unicast(c *Connection, to string, m interface{}) {
b := c.h.buffers.Pop()
b := c.h.buffers.New()
encoder := json.NewEncoder(b)
err := encoder.Encode(&DataOutgoing{From: c.Id, To: to, Data: m})
if err != nil {
c.h.buffers.Push(b)
b.Decref()
log.Println("Unicast error while encoding JSON", err)
return
}
var msg = &MessageRequest{From: c.Id, To: to, Message: b.Bytes()}
var msg = &MessageRequest{From: c.Id, To: to, Message: b}
c.h.unicastHandler(msg)
c.h.buffers.Push(b)
b.Decref()
}
func (s *Server) Alive(c *Connection, alive *DataAlive) {
@ -183,16 +181,18 @@ func (s *Server) Broadcast(c *Connection, m interface{}) { @@ -183,16 +181,18 @@ func (s *Server) Broadcast(c *Connection, m interface{}) {
return
}
buffer := c.h.buffers.Wrap(b)
if c.h.isGlobalRoomid(c.Roomid) {
c.h.RunForAllRooms(func(room *RoomWorker) {
var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id}
var msg = &MessageRequest{From: c.Id, Message: buffer, Id: room.Id}
room.broadcastHandler(msg)
})
} else {
var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid}
var msg = &MessageRequest{From: c.Id, Message: buffer, Id: c.Roomid}
room := c.h.GetRoom(c.Roomid)
room.broadcastHandler(msg)
}
buffer.Decref()
}

Loading…
Cancel
Save