From 17aba85b51b2a00f7783c5833e13431dae3d4f78 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 12 Mar 2014 17:39:16 +0100 Subject: [PATCH 1/2] Reuse buffers when reading/writing data. --- .../spreed-speakfreely-server/buffercache.go | 65 +++++++++++++++++++ .../spreed-speakfreely-server/connection.go | 28 +++++++- src/app/spreed-speakfreely-server/hub.go | 3 + src/app/spreed-speakfreely-server/server.go | 8 ++- 4 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 src/app/spreed-speakfreely-server/buffercache.go diff --git a/src/app/spreed-speakfreely-server/buffercache.go b/src/app/spreed-speakfreely-server/buffercache.go new file mode 100644 index 00000000..b5543f67 --- /dev/null +++ b/src/app/spreed-speakfreely-server/buffercache.go @@ -0,0 +1,65 @@ +/* + * Spreed Speak Freely. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed Speak Freely. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ +package main + +import ( + "bytes" +) + +type BufferCache interface { + Push(buffer *bytes.Buffer) + + Pop() *bytes.Buffer +} + +type bufferCache struct { + buffers chan *bytes.Buffer + initialSize int +} + +func NewBufferCache(count int, initialSize int) BufferCache { + return &bufferCache{buffers: make(chan *bytes.Buffer, count), initialSize: initialSize} +} + +func (cache *bufferCache) Push(buffer *bytes.Buffer) { + buffer.Reset() + select { + case cache.buffers <- buffer: + // buffer has been stored for reuse + break + default: + // buffer list full, buffer will be collected + break + } +} + +func (cache *bufferCache) Pop() *bytes.Buffer { + var buffer *bytes.Buffer + select { + case buffer = <-cache.buffers: + // reuse existing buffer + break + default: + buffer = bytes.NewBuffer(make([]byte, 0, cache.initialSize)) + break + } + return buffer +} diff --git a/src/app/spreed-speakfreely-server/connection.go b/src/app/spreed-speakfreely-server/connection.go index 068f0184..3d059b38 100644 --- a/src/app/spreed-speakfreely-server/connection.go +++ b/src/app/spreed-speakfreely-server/connection.go @@ -21,9 +21,9 @@ package main import ( + "bytes" "github.com/gorilla/websocket" "io" - "io/ioutil" "log" "sync" "time" @@ -131,6 +131,25 @@ func (c *Connection) unregister() { c.h.unregisterHandler(c) } +func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error { + var err error + defer func() { + e := recover() + if e == nil { + return + } + if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { + err = panicErr + } else { + panic(e) + } + }() + + dest.Reset() + _, err = dest.ReadFrom(r) + return err +} + // readPump pumps messages from the websocket connection to the hub. func (c *Connection) readPump() { @@ -159,12 +178,15 @@ func (c *Connection) readPump() { } switch op { case websocket.TextMessage: - message, err := ioutil.ReadAll(r) + message := c.h.buffers.Pop() + err = c.readAll(message, r) if err != nil { + c.h.buffers.Push(message) break } <-ticker.C - c.h.server.OnText(c, message) + c.h.server.OnText(c, message.Bytes()) + c.h.buffers.Push(message) } } } diff --git a/src/app/spreed-speakfreely-server/hub.go b/src/app/spreed-speakfreely-server/hub.go index d2cfe097..a1207618 100644 --- a/src/app/spreed-speakfreely-server/hub.go +++ b/src/app/spreed-speakfreely-server/hub.go @@ -21,6 +21,7 @@ package main import ( + "bytes" "crypto/hmac" "crypto/sha1" "encoding/base64" @@ -67,6 +68,7 @@ type Hub struct { tickets *securecookie.SecureCookie count uint64 mutex sync.RWMutex + buffers BufferCache } func NewHub(version string, config *Config, sessionSecret string, turnSecret string) *Hub { @@ -82,6 +84,7 @@ func NewHub(version string, config *Config, sessionSecret string, turnSecret str } h.tickets = securecookie.New(h.sessionSecret, nil) + h.buffers = NewBufferCache(1024, bytes.MinRead) return h } diff --git a/src/app/spreed-speakfreely-server/server.go b/src/app/spreed-speakfreely-server/server.go index 8ffc2aa4..4f195c3b 100644 --- a/src/app/spreed-speakfreely-server/server.go +++ b/src/app/spreed-speakfreely-server/server.go @@ -146,15 +146,19 @@ func (s *Server) OnText(c *Connection, b []byte) { func (s *Server) Unicast(c *Connection, to string, m interface{}) { - b, err := json.Marshal(&DataOutgoing{From: c.Id, To: to, Data: m}) + b := c.h.buffers.Pop() + encoder := json.NewEncoder(b) + err := encoder.Encode(&DataOutgoing{From: c.Id, To: to, Data: m}) if err != nil { + c.h.buffers.Push(b) log.Println("Unicast error while encoding JSON", err) return } - var msg = &MessageRequest{From: c.Id, To: to, Message: b} + var msg = &MessageRequest{From: c.Id, To: to, Message: b.Bytes()} c.h.unicastHandler(msg) + c.h.buffers.Push(b) } From 99e4f804436a1b75ec32a21c2c52aebb0381a248 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 12 Mar 2014 17:40:31 +0100 Subject: [PATCH 2/2] Added myself to authors. --- AUTHORS | 1 + 1 file changed, 1 insertion(+) diff --git a/AUTHORS b/AUTHORS index 75ff6363..6f3bab27 100644 --- a/AUTHORS +++ b/AUTHORS @@ -3,5 +3,6 @@ # # Please keep the list sorted. +Joachim Bauch Simon Eisenmann