Browse Source

Reuse buffers when reading/writing data.

pull/2/head
Joachim Bauch 12 years ago
parent
commit
17aba85b51
  1. 65
      src/app/spreed-speakfreely-server/buffercache.go
  2. 28
      src/app/spreed-speakfreely-server/connection.go
  3. 3
      src/app/spreed-speakfreely-server/hub.go
  4. 8
      src/app/spreed-speakfreely-server/server.go

65
src/app/spreed-speakfreely-server/buffercache.go

@ -0,0 +1,65 @@
/*
* Spreed Speak Freely.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed Speak Freely.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"bytes"
)
type BufferCache interface {
Push(buffer *bytes.Buffer)
Pop() *bytes.Buffer
}
type bufferCache struct {
buffers chan *bytes.Buffer
initialSize int
}
func NewBufferCache(count int, initialSize int) BufferCache {
return &bufferCache{buffers: make(chan *bytes.Buffer, count), initialSize: initialSize}
}
func (cache *bufferCache) Push(buffer *bytes.Buffer) {
buffer.Reset()
select {
case cache.buffers <- buffer:
// buffer has been stored for reuse
break
default:
// buffer list full, buffer will be collected
break
}
}
func (cache *bufferCache) Pop() *bytes.Buffer {
var buffer *bytes.Buffer
select {
case buffer = <-cache.buffers:
// reuse existing buffer
break
default:
buffer = bytes.NewBuffer(make([]byte, 0, cache.initialSize))
break
}
return buffer
}

28
src/app/spreed-speakfreely-server/connection.go

@ -21,9 +21,9 @@
package main package main
import ( import (
"bytes"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"io" "io"
"io/ioutil"
"log" "log"
"sync" "sync"
"time" "time"
@ -131,6 +131,25 @@ func (c *Connection) unregister() {
c.h.unregisterHandler(c) c.h.unregisterHandler(c)
} }
func (c *Connection) readAll(dest *bytes.Buffer, r io.Reader) error {
var err error
defer func() {
e := recover()
if e == nil {
return
}
if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
err = panicErr
} else {
panic(e)
}
}()
dest.Reset()
_, err = dest.ReadFrom(r)
return err
}
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
func (c *Connection) readPump() { func (c *Connection) readPump() {
@ -159,12 +178,15 @@ func (c *Connection) readPump() {
} }
switch op { switch op {
case websocket.TextMessage: case websocket.TextMessage:
message, err := ioutil.ReadAll(r) message := c.h.buffers.Pop()
err = c.readAll(message, r)
if err != nil { if err != nil {
c.h.buffers.Push(message)
break break
} }
<-ticker.C <-ticker.C
c.h.server.OnText(c, message) c.h.server.OnText(c, message.Bytes())
c.h.buffers.Push(message)
} }
} }
} }

3
src/app/spreed-speakfreely-server/hub.go

@ -21,6 +21,7 @@
package main package main
import ( import (
"bytes"
"crypto/hmac" "crypto/hmac"
"crypto/sha1" "crypto/sha1"
"encoding/base64" "encoding/base64"
@ -67,6 +68,7 @@ type Hub struct {
tickets *securecookie.SecureCookie tickets *securecookie.SecureCookie
count uint64 count uint64
mutex sync.RWMutex mutex sync.RWMutex
buffers BufferCache
} }
func NewHub(version string, config *Config, sessionSecret string, turnSecret string) *Hub { func NewHub(version string, config *Config, sessionSecret string, turnSecret string) *Hub {
@ -82,6 +84,7 @@ func NewHub(version string, config *Config, sessionSecret string, turnSecret str
} }
h.tickets = securecookie.New(h.sessionSecret, nil) h.tickets = securecookie.New(h.sessionSecret, nil)
h.buffers = NewBufferCache(1024, bytes.MinRead)
return h return h
} }

8
src/app/spreed-speakfreely-server/server.go

@ -146,15 +146,19 @@ func (s *Server) OnText(c *Connection, b []byte) {
func (s *Server) Unicast(c *Connection, to string, m interface{}) { func (s *Server) Unicast(c *Connection, to string, m interface{}) {
b, err := json.Marshal(&DataOutgoing{From: c.Id, To: to, Data: m}) b := c.h.buffers.Pop()
encoder := json.NewEncoder(b)
err := encoder.Encode(&DataOutgoing{From: c.Id, To: to, Data: m})
if err != nil { if err != nil {
c.h.buffers.Push(b)
log.Println("Unicast error while encoding JSON", err) log.Println("Unicast error while encoding JSON", err)
return return
} }
var msg = &MessageRequest{From: c.Id, To: to, Message: b} var msg = &MessageRequest{From: c.Id, To: to, Message: b.Bytes()}
c.h.unicastHandler(msg) c.h.unicastHandler(msg)
c.h.buffers.Push(b)
} }

Loading…
Cancel
Save