Browse Source

Use multiple channels for caching of buffers.

This should help in high-concurrency situations but needs more profiling.
pull/9/head
Joachim Bauch 12 years ago
parent
commit
1d92587732
  1. 22
      src/app/spreed-speakfreely-server/buffercache.go

22
src/app/spreed-speakfreely-server/buffercache.go

@ -23,6 +23,7 @@ package main @@ -23,6 +23,7 @@ package main
import (
"bytes"
"io"
"runtime"
"sync/atomic"
)
@ -103,12 +104,23 @@ func (b *directBuffer) Decref() { @@ -103,12 +104,23 @@ func (b *directBuffer) Decref() {
}
type bufferCache struct {
buffers chan Buffer
buffers []chan Buffer
initialSize int
num int32
readPos int32
writePos int32
}
func NewBufferCache(count int, initialSize int) BufferCache {
return &bufferCache{buffers: make(chan Buffer, count), initialSize: initialSize}
result := &bufferCache{initialSize: initialSize}
result.num = int32(runtime.NumCPU())
result.buffers = make([]chan Buffer, result.num, result.num)
for i := int32(0); i < result.num; i++ {
result.buffers[i] = make(chan Buffer, count/runtime.NumCPU())
}
result.readPos = 0
result.writePos = result.num / 2
return result
}
func (cache *bufferCache) push(buffer Buffer) {
@ -117,8 +129,9 @@ func (cache *bufferCache) push(buffer Buffer) { @@ -117,8 +129,9 @@ func (cache *bufferCache) push(buffer Buffer) {
return
}
buffer.Reset()
pos := atomic.AddInt32(&cache.writePos, 1) % cache.num
select {
case cache.buffers <- buffer:
case cache.buffers[pos] <- buffer:
// buffer has been stored for reuse
break
default:
@ -129,8 +142,9 @@ func (cache *bufferCache) push(buffer Buffer) { @@ -129,8 +142,9 @@ func (cache *bufferCache) push(buffer Buffer) {
func (cache *bufferCache) New() Buffer {
var buffer Buffer
pos := atomic.AddInt32(&cache.readPos, 1) % cache.num
select {
case buffer = <-cache.buffers:
case buffer = <-cache.buffers[pos]:
// reuse existing buffer
buffer.Incref()
break

Loading…
Cancel
Save