Browse Source

Merge pull request #2 from fancycode/read_write_buffercache

Reuse buffers when reading/writing data.
pull/4/head
Simon Eisenmann 12 years ago
parent
commit
12ed4d972b
  1. 1
      AUTHORS
  2. 65
      src/app/spreed-speakfreely-server/buffercache.go
  3. 28
      src/app/spreed-speakfreely-server/connection.go
  4. 3
      src/app/spreed-speakfreely-server/hub.go
  5. 8
      src/app/spreed-speakfreely-server/server.go

1
AUTHORS

@ -3,5 +3,6 @@ @@ -3,5 +3,6 @@
#
# Please keep the list sorted.
Joachim Bauch <bauch@struktur.de>
Simon Eisenmann <simon@struktur.de>

65
src/app/spreed-speakfreely-server/buffercache.go

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
/*
* Spreed Speak Freely.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed Speak Freely.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"bytes"
)
type BufferCache interface {
Push(buffer *bytes.Buffer)
Pop() *bytes.Buffer
}
type bufferCache struct {
buffers chan *bytes.Buffer
initialSize int
}
func NewBufferCache(count int, initialSize int) BufferCache {
return &bufferCache{buffers: make(chan *bytes.Buffer, count), initialSize: initialSize}
}
func (cache *bufferCache) Push(buffer *bytes.Buffer) {
buffer.Reset()
select {
case cache.buffers <- buffer:
// buffer has been stored for reuse
break
default:
// buffer list full, buffer will be collected
break
}
}
func (cache *bufferCache) Pop() *bytes.Buffer {
var buffer *bytes.Buffer
select {
case buffer = <-cache.buffers:
// reuse existing buffer
break
default:
buffer = bytes.NewBuffer(make([]byte, 0, cache.initialSize))
break
}
return buffer
}

28
src/app/spreed-speakfreely-server/connection.go

@ -21,9 +21,9 @@ @@ -21,9 +21,9 @@
package main
import (
"bytes"
"github.com/gorilla/websocket"
"io"
"io/ioutil"
"log"
"sync"
"time"
@ -131,6 +131,25 @@ func (c *Connection) unregister() { @@ -131,6 +131,25 @@ func (c *Connection) unregister() {
c.h.unregisterHandler(c)
}
func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
var err error
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
dest.Reset()
_, err = dest.ReadFrom(r)
return err
}
// readPump pumps messages from the websocket connection to the hub.
func (c *Connection) readPump() {
@ -159,12 +178,15 @@ func (c *Connection) readPump() { @@ -159,12 +178,15 @@ func (c *Connection) readPump() {
}
switch op {
case websocket.TextMessage:
message, err := ioutil.ReadAll(r)
message := c.h.buffers.Pop()
err = c.readAll(message, r)
if err != nil {
c.h.buffers.Push(message)
break
}
<-ticker.C
c.h.server.OnText(c, message)
c.h.server.OnText(c, message.Bytes())
c.h.buffers.Push(message)
}
}
}

3
src/app/spreed-speakfreely-server/hub.go

@ -21,6 +21,7 @@ @@ -21,6 +21,7 @@
package main
import (
"bytes"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
@ -67,6 +68,7 @@ type Hub struct { @@ -67,6 +68,7 @@ type Hub struct {
tickets *securecookie.SecureCookie
count uint64
mutex sync.RWMutex
buffers BufferCache
}
func NewHub(version string, config *Config, sessionSecret string, turnSecret string) *Hub {
@ -82,6 +84,7 @@ func NewHub(version string, config *Config, sessionSecret string, turnSecret str @@ -82,6 +84,7 @@ func NewHub(version string, config *Config, sessionSecret string, turnSecret str
}
h.tickets = securecookie.New(h.sessionSecret, nil)
h.buffers = NewBufferCache(1024, bytes.MinRead)
return h
}

8
src/app/spreed-speakfreely-server/server.go

@ -146,15 +146,19 @@ func (s *Server) OnText(c *Connection, b []byte) { @@ -146,15 +146,19 @@ func (s *Server) OnText(c *Connection, b []byte) {
func (s *Server) Unicast(c *Connection, to string, m interface{}) {
b, err := json.Marshal(&DataOutgoing{From: c.Id, To: to, Data: m})
b := c.h.buffers.Pop()
encoder := json.NewEncoder(b)
err := encoder.Encode(&DataOutgoing{From: c.Id, To: to, Data: m})
if err != nil {
c.h.buffers.Push(b)
log.Println("Unicast error while encoding JSON", err)
return
}
var msg = &MessageRequest{From: c.Id, To: to, Message: b}
var msg = &MessageRequest{From: c.Id, To: to, Message: b.Bytes()}
c.h.unicastHandler(msg)
c.h.buffers.Push(b)
}

Loading…
Cancel
Save