|
|
@ -23,6 +23,7 @@ package main |
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"bytes" |
|
|
|
"bytes" |
|
|
|
"io" |
|
|
|
"io" |
|
|
|
|
|
|
|
"runtime" |
|
|
|
"sync/atomic" |
|
|
|
"sync/atomic" |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
@ -103,12 +104,23 @@ func (b *directBuffer) Decref() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
type bufferCache struct { |
|
|
|
type bufferCache struct { |
|
|
|
buffers chan Buffer |
|
|
|
buffers []chan Buffer |
|
|
|
initialSize int |
|
|
|
initialSize int |
|
|
|
|
|
|
|
num int32 |
|
|
|
|
|
|
|
readPos int32 |
|
|
|
|
|
|
|
writePos int32 |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func NewBufferCache(count int, initialSize int) BufferCache { |
|
|
|
func NewBufferCache(count int, initialSize int) BufferCache { |
|
|
|
return &bufferCache{buffers: make(chan Buffer, count), initialSize: initialSize} |
|
|
|
result := &bufferCache{initialSize: initialSize} |
|
|
|
|
|
|
|
result.num = int32(runtime.NumCPU()) |
|
|
|
|
|
|
|
result.buffers = make([]chan Buffer, result.num, result.num) |
|
|
|
|
|
|
|
for i := int32(0); i < result.num; i++ { |
|
|
|
|
|
|
|
result.buffers[i] = make(chan Buffer, count/runtime.NumCPU()) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
result.readPos = 0 |
|
|
|
|
|
|
|
result.writePos = result.num / 2 |
|
|
|
|
|
|
|
return result |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (cache *bufferCache) push(buffer Buffer) { |
|
|
|
func (cache *bufferCache) push(buffer Buffer) { |
|
|
@ -117,8 +129,9 @@ func (cache *bufferCache) push(buffer Buffer) { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
buffer.Reset() |
|
|
|
buffer.Reset() |
|
|
|
|
|
|
|
pos := atomic.AddInt32(&cache.writePos, 1) % cache.num |
|
|
|
select { |
|
|
|
select { |
|
|
|
case cache.buffers <- buffer: |
|
|
|
case cache.buffers[pos] <- buffer: |
|
|
|
// buffer has been stored for reuse
|
|
|
|
// buffer has been stored for reuse
|
|
|
|
break |
|
|
|
break |
|
|
|
default: |
|
|
|
default: |
|
|
@ -129,8 +142,9 @@ func (cache *bufferCache) push(buffer Buffer) { |
|
|
|
|
|
|
|
|
|
|
|
func (cache *bufferCache) New() Buffer { |
|
|
|
func (cache *bufferCache) New() Buffer { |
|
|
|
var buffer Buffer |
|
|
|
var buffer Buffer |
|
|
|
|
|
|
|
pos := atomic.AddInt32(&cache.readPos, 1) % cache.num |
|
|
|
select { |
|
|
|
select { |
|
|
|
case buffer = <-cache.buffers: |
|
|
|
case buffer = <-cache.buffers[pos]: |
|
|
|
// reuse existing buffer
|
|
|
|
// reuse existing buffer
|
|
|
|
buffer.Incref() |
|
|
|
buffer.Incref() |
|
|
|
break |
|
|
|
break |
|
|
|