diff --git a/pool/benchmark/buffer_test.go b/pool/benchmark/buffer_test.go deleted file mode 100644 index 9ea8037..0000000 --- a/pool/benchmark/buffer_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// MIT License -// Copyright (c) 2020 Qi Yin - -package benchmark - -import ( - "bytes" - "container/ring" - "math/rand" - "sync" - "testing" - - "github.com/thinkeridea/go-extend/pool" -) - -var bufferData = ring.New(256) - -func init() { - data := bufferData - for i := 0; i < 256; i++ { - data.Value = make([]byte, rand.Intn(1<<16)) - data = data.Next() - } -} - -func BenchmarkBufferPool(b *testing.B) { - buff := pool.NewBuffer(64) - p := [20]*bytes.Buffer{} - data := bufferData - for i := 0; i < b.N; i++ { - data.Next() - bf := buff.Get() - bf.Write(data.Value.([]byte)) - - idx := i % 20 - if v := p[idx]; v != nil { - buff.Put(v) - } - - p[idx] = bf - } -} - -func BenchmarkBufferSyncPool(b *testing.B) { - buff := sync.Pool{New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 64)) - }} - - p := [20]*bytes.Buffer{} - data := bufferData - for i := 0; i < b.N; i++ { - data.Next() - bf := buff.Get().(*bytes.Buffer) - bf.Reset() - bf.Write(data.Value.([]byte)) - - idx := i % 20 - if v := p[idx]; v != nil { - buff.Put(v) - } - - p[idx] = bf - } -} - -// 用来测试在容量没有变化的情况下与原始方式的性能差异 -func BenchmarkBufferFixedSizePool(b *testing.B) { - buff := pool.NewBuffer(64) - p := [20]*bytes.Buffer{} - data := make([]byte, 50) - for i := 0; i < b.N; i++ { - bf := buff.Get() - bf.Write(data) - - idx := i % 20 - if v := p[idx]; v != nil { - buff.Put(v) - } - - p[idx] = bf - } -} - -func BenchmarkBufferFixedSizeSyncPool(b *testing.B) { - buff := sync.Pool{New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, 64)) - }} - - p := [20]*bytes.Buffer{} - data := make([]byte, 50) - for i := 0; i < b.N; i++ { - bf := buff.Get().(*bytes.Buffer) - bf.Reset() - bf.Write(data) - - idx := i % 20 - if v := p[idx]; v != nil { - buff.Put(v) - } - - p[idx] = bf - } -} diff --git a/pool/buffer.go b/pool/buffer.go index a72eaff..149e0d4 100644 --- a/pool/buffer.go +++ b/pool/buffer.go @@ -5,27 +5,12 @@ package pool import ( "bytes" - "math/bits" - "sort" "sync" - "sync/atomic" "unsafe" "github.com/thinkeridea/go-extend/exsync" ) -const ( - bucketSize = 22 - minSizeBits = 6 - minSize = 1 << minSizeBits // 2^6=64 - maxSize = 1 << (minSizeBits + bucketSize - 1) // 2^28=256MB - calibrateCallsThreshold = 10240 -) - -var ( - buffBucket = [bucketSize]sync.Pool{} -) - // BufferPool bytes.Buffer 的 sync.Pool 接口 // 可以直接 Get *bytes.Buffer 并 Reset Buffer type BufferPool interface { @@ -36,77 +21,6 @@ type BufferPool interface { Put(*bytes.Buffer) } -type buffer struct { - index uint32 - calibrating uint32 - release uint32 - calls [bucketSize]uint32 -} - -// NewBuffer 创建一个动态评估需求容量的 BufferPool,它们共享一个底层 bytes.Buffer 区间池 -// Deprecated: 这是错误测试产生的结果,它并没有相较 sync.Pool 有明显优势,详细参看: https://github.com/thinkeridea/go-extend/issues/17 -func NewBuffer(size int) BufferPool { - b := &buffer{} - b.index = uint32(buffBucketIndex(size)) - return b -} - -// Get 从 Pool 中获取一个 *bytes.Buffer 实例, 该实例已经被 Reset -func (p *buffer) Get() *bytes.Buffer { - idx := atomic.LoadUint32(&p.index) - v := buffBucket[idx].Get() - if v != nil { - b := v.(*bytes.Buffer) - b.Reset() - return b - } - - return bytes.NewBuffer(make([]byte, 0, minSize< calibrateCallsThreshold { - p.calibrate() - } -} - -func (p *buffer) calibrate() { - if !atomic.CompareAndSwapUint32(&p.calibrating, 0, 1) { - return - } - - var callSize [bucketSize]uint64 - for i := range callSize { - callSize[i] = uint64(atomic.SwapUint32(&p.calls[i], 0))<<32 | minSize< callSize[j] - }) - - atomic.SwapUint32(&p.index, uint32(buffBucketIndex(int(callSize[0]<<32>>32)))) - atomic.StoreUint32(&p.release, 0) - atomic.SwapUint32(&p.calibrating, 0) -} - -func buffBucketIndex(n int) int { - if n == 0 { - return 0 - } - - idx := bits.Len32(uint32((n - 1) >> minSizeBits)) - if idx >= bucketSize { - idx = bucketSize - 1 - } - return idx -} - var ( buff64 exsync.OncePointer buff128 exsync.OncePointer @@ -121,6 +35,17 @@ type bufferPool struct { sync.Pool } +// NewBuffer 创建一个指定容量的 BufferPool,这将是一个独立的非共享的,每次调用都会返回新的BufferPool +func NewBuffer(size int) BufferPool { + return &bufferPool{ + Pool: sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, size)) + }, + }, + } +} + func newBufferPool(size int) unsafe.Pointer { return unsafe.Pointer(&bufferPool{ Pool: sync.Pool{ diff --git a/pool/buffer_test.go b/pool/buffer_test.go index 41dfe86..2cd2d10 100644 --- a/pool/buffer_test.go +++ b/pool/buffer_test.go @@ -8,181 +8,24 @@ import ( "testing" ) -func TestBuffer_Get(t *testing.T) { - p := NewBuffer(64) - b := p.Get() - if b.Cap() != 64 { - t.Errorf("b.Cap():%d != 64", b.Cap()) - } - - if b.Len() != 0 { - t.Errorf("b.String():%s != xx", b.String()) +func TestNewBuffer(t *testing.T) { + p1 := NewBuffer(10) + p2 := NewBuffer(10) + if p1 == p2 { + t.Errorf("p1:%p == p2:%p", p1, p2) } - b.WriteString("xx") - if b.String() != "xx" { - t.Errorf("b.String():%s != xx", b.String()) - } - - // 开启 race 时有一定概率导致 Put 被丢弃 - for i := 0; i < 10; i++ { - b = p.Get() - if b.Len() != 0 { - t.Errorf("b.String():%s != xx", b.String()) - } - - if b.Cap() != 64 { - t.Errorf("b.Cap():%d != 64", b.Cap()) - } - p.Put(b) + b := p1.Get() + if c := b.Cap(); c != 10 { + t.Errorf("b.Cap:%d != 10", c) } -} -func TestBuffer_Put(t *testing.T) { - p := NewBuffer(64) - b := p.Get() b.WriteString("xx") if b.String() != "xx" { t.Errorf("b.String():%s != xx", b.String()) } - p.Put(b) - - bufp := p.(*buffer) - if bufp.release != 1 { - t.Errorf("release:%d != 1", bufp.release) - } - - if bufp.calls[0] != 1 { - t.Errorf("calls[0]:%d != 1", bufp.calls[0]) - } - - var bb *bytes.Buffer - // 开启 race 时有一定概率导致 Put 被丢弃 - pp := buffBucket[0] - var n uint32 = 1 - for i := 0; i < 10; i++ { - v := pp.Get() - if v == nil { - p.Put(b) - n++ - continue - } - - bb = v.(*bytes.Buffer) - if bb.String() == "xx" { - break - } - - p.Put(b) - n++ - } - - if bb.String() != "xx" { - t.Errorf("b1.String():%s != xx", bb.String()) - } - - if bufp.release != n { - t.Errorf("release:%d != %d", bufp.release, n) - } - - if bufp.calls[0] != n { - t.Errorf("calls[0]:%d != %d", bufp.calls[0], n) - } - - p.Put(bytes.NewBuffer(make([]byte, 1024))) - if bufp.release != n+1 { - t.Errorf("release:%d != %d", bufp.release, n+1) - } - - if bufp.calls[4] != 1 { - t.Errorf("calls[4]:%d != 1", bufp.calls[0]) - } - - for i := 0; i < bucketSize; i++ { - if i != 0 && i != 4 && bufp.calls[i] != 0 { - t.Errorf("calls[%d]:%d != 0", i, bufp.calls[0]) - } - } -} - -func TestBuffer_Calibrate(t *testing.T) { - p := NewBuffer(64) - for i := 0; i < 5; i++ { - for j := 0; j < 5; j++ { - p.Put(bytes.NewBuffer(make([]byte, minSize< minSizeBits { - n = i - minSizeBits - } - - if n >= bucketSize { - n = bucketSize - 1 - } - - if idx := buffBucketIndex(1 << i); idx != n { - t.Errorf("index(%d) :%d = %d", 1< minSizeBits { - if idx := buffBucketIndex(1<<(i-1) + 1); idx != n { - t.Errorf("index(%d) :%d != %d", 1<<(i-1)+1, idx, n) - } - } - } + p1.Put(b) } func TestGetBuff64(t *testing.T) { diff --git a/pool/std_buffer.go b/pool/std_buffer.go deleted file mode 100644 index 040c765..0000000 --- a/pool/std_buffer.go +++ /dev/null @@ -1,25 +0,0 @@ -// MIT License -// Copyright (c) 2020 Qi Yin - -package pool - -import ( - "bytes" - "unsafe" -) - -// stdBuffer 是 bytes.Buffer 的结构引用,这需要对各个go的版本进行测试,保证该结构的准确性 -type stdBuffer struct { - b []byte - off int -} - -// bufferLen 获取buf的实际使用长度, 默认使用 cap 作为降级方案 -func bufferLen(b *bytes.Buffer) int { - x := (*stdBuffer)(unsafe.Pointer(b)) - if x.off+b.Len() == len(x.b) { - return len(x.b) - } - - return b.Cap() -} diff --git a/pool/std_buffer_test.go b/pool/std_buffer_test.go deleted file mode 100644 index d1e0547..0000000 --- a/pool/std_buffer_test.go +++ /dev/null @@ -1,43 +0,0 @@ -// MIT License -// Copyright (c) 2020 Qi Yin - -package pool - -import ( - "bytes" - "reflect" - "testing" - "unsafe" -) - -func TestStdBuffer(t *testing.T) { - data := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - buf := bytes.NewBuffer(make([]byte, 0, 20)) - buf.Write(data) - - r := make([]byte, 5) - buf.Read(r) - - x := (*stdBuffer)(unsafe.Pointer(buf)) - if !reflect.DeepEqual(x.b, data) { - t.Errorf("Buffer.buf(%v) != (%v)", x.b, data) - } - - if x.off != 5 { - t.Errorf("Buffer.off(%d) != 5", x.off) - } -} - -func TestBufferLen(t *testing.T) { - data := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - buf := bytes.NewBuffer(make([]byte, 0, 20)) - buf.Write(data) - - r := make([]byte, 5) - buf.Read(r) - - n := bufferLen(buf) - if n != 10 { - t.Errorf("bufferLen(%d) != 10", n) - } -}