diff --git a/src/app/spreed-speakfreely-server/buffercache.go b/src/app/spreed-speakfreely-server/buffercache.go index 1d1190b6..ad600dbc 100644 --- a/src/app/spreed-speakfreely-server/buffercache.go +++ b/src/app/spreed-speakfreely-server/buffercache.go @@ -23,6 +23,7 @@ package main import ( "bytes" "io" + "runtime" "sync/atomic" ) @@ -103,12 +104,23 @@ func (b *directBuffer) Decref() { } type bufferCache struct { - buffers chan Buffer + buffers []chan Buffer initialSize int + num int32 + readPos int32 + writePos int32 } func NewBufferCache(count int, initialSize int) BufferCache { - return &bufferCache{buffers: make(chan Buffer, count), initialSize: initialSize} + result := &bufferCache{initialSize: initialSize} + result.num = int32(runtime.NumCPU()) + result.buffers = make([]chan Buffer, result.num, result.num) + for i := int32(0); i < result.num; i++ { + result.buffers[i] = make(chan Buffer, count/runtime.NumCPU()) + } + result.readPos = 0 + result.writePos = result.num / 2 + return result } func (cache *bufferCache) push(buffer Buffer) { @@ -117,8 +129,9 @@ func (cache *bufferCache) push(buffer Buffer) { return } buffer.Reset() + pos := atomic.AddInt32(&cache.writePos, 1) % cache.num select { - case cache.buffers <- buffer: + case cache.buffers[pos] <- buffer: // buffer has been stored for reuse break default: @@ -129,8 +142,9 @@ func (cache *bufferCache) push(buffer Buffer) { func (cache *bufferCache) New() Buffer { var buffer Buffer + pos := atomic.AddInt32(&cache.readPos, 1) % cache.num select { - case buffer = <-cache.buffers: + case buffer = <-cache.buffers[pos]: // reuse existing buffer buffer.Incref() break