Skip to content

Commit

Permalink
改变 pool.NewBuffer 的行为,响应固定容量的BufferPool (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkeridea authored Feb 23, 2021
1 parent d07263d commit 78491dc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 423 deletions.
103 changes: 0 additions & 103 deletions pool/benchmark/buffer_test.go

This file was deleted.

97 changes: 11 additions & 86 deletions pool/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,12 @@ package pool

import (
"bytes"
"math/bits"
"sort"
"sync"
"sync/atomic"
"unsafe"

"github.com/thinkeridea/go-extend/exsync"
)

const (
bucketSize = 22
minSizeBits = 6
minSize = 1 << minSizeBits // 2^6=64
maxSize = 1 << (minSizeBits + bucketSize - 1) // 2^28=256MB
calibrateCallsThreshold = 10240
)

var (
buffBucket = [bucketSize]sync.Pool{}
)

// BufferPool bytes.Buffer 的 sync.Pool 接口
// 可以直接 Get *bytes.Buffer 并 Reset Buffer
type BufferPool interface {
Expand All @@ -36,77 +21,6 @@ type BufferPool interface {
Put(*bytes.Buffer)
}

type buffer struct {
index uint32
calibrating uint32
release uint32
calls [bucketSize]uint32
}

// NewBuffer 创建一个动态评估需求容量的 BufferPool,它们共享一个底层 bytes.Buffer 区间池
// Deprecated: 这是错误测试产生的结果,它并没有相较 sync.Pool 有明显优势,详细参看: https://github.com/thinkeridea/go-extend/issues/17
func NewBuffer(size int) BufferPool {
b := &buffer{}
b.index = uint32(buffBucketIndex(size))
return b
}

// Get 从 Pool 中获取一个 *bytes.Buffer 实例, 该实例已经被 Reset
func (p *buffer) Get() *bytes.Buffer {
idx := atomic.LoadUint32(&p.index)
v := buffBucket[idx].Get()
if v != nil {
b := v.(*bytes.Buffer)
b.Reset()
return b
}

return bytes.NewBuffer(make([]byte, 0, minSize<<idx))
}

// Put 把 *bytes.Buffer 放回 Pool 中
func (p *buffer) Put(b *bytes.Buffer) {
if b.Cap() <= maxSize {
buffBucket[buffBucketIndex(b.Cap())].Put(b)
}

atomic.AddUint32(&p.calls[buffBucketIndex(bufferLen(b))], 1)
if atomic.AddUint32(&p.release, 1) > calibrateCallsThreshold {
p.calibrate()
}
}

func (p *buffer) calibrate() {
if !atomic.CompareAndSwapUint32(&p.calibrating, 0, 1) {
return
}

var callSize [bucketSize]uint64
for i := range callSize {
callSize[i] = uint64(atomic.SwapUint32(&p.calls[i], 0))<<32 | minSize<<i
}

sort.Slice(callSize[:], func(i, j int) bool {
return callSize[i] > callSize[j]
})

atomic.SwapUint32(&p.index, uint32(buffBucketIndex(int(callSize[0]<<32>>32))))
atomic.StoreUint32(&p.release, 0)
atomic.SwapUint32(&p.calibrating, 0)
}

func buffBucketIndex(n int) int {
if n == 0 {
return 0
}

idx := bits.Len32(uint32((n - 1) >> minSizeBits))
if idx >= bucketSize {
idx = bucketSize - 1
}
return idx
}

var (
buff64 exsync.OncePointer
buff128 exsync.OncePointer
Expand All @@ -121,6 +35,17 @@ type bufferPool struct {
sync.Pool
}

// NewBuffer 创建一个指定容量的 BufferPool,这将是一个独立的非共享的,每次调用都会返回新的BufferPool
func NewBuffer(size int) BufferPool {
return &bufferPool{
Pool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, size))
},
},
}
}

func newBufferPool(size int) unsafe.Pointer {
return unsafe.Pointer(&bufferPool{
Pool: sync.Pool{
Expand Down
Loading

0 comments on commit 78491dc

Please sign in to comment.