-
Notifications
You must be signed in to change notification settings - Fork 192
/
Copy pathpool.go
157 lines (137 loc) · 2.89 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package rueidis
import (
"context"
"errors"
"sync"
"time"
)
// errAcquireComplete is a special error used to indicate that the Acquire operation has completed successfully
var errAcquireComplete = errors.New("acquire complete")
func newPool(cap int, dead wire, cleanup time.Duration, minSize int, makeFn func(context.Context) wire) *pool {
if cap <= 0 {
cap = DefaultPoolSize
}
return &pool{
size: 0,
minSize: minSize,
cap: cap,
dead: dead,
make: makeFn,
list: make([]wire, 0, 4),
cond: sync.NewCond(&sync.Mutex{}),
cleanup: cleanup,
}
}
type pool struct {
dead wire
cond *sync.Cond
timer *time.Timer
make func(ctx context.Context) wire
list []wire
cleanup time.Duration
size int
minSize int
cap int
down bool
timerOn bool
}
func (p *pool) Acquire(ctx context.Context) (v wire) {
p.cond.L.Lock()
// Set up ctx handling when waiting for an available connection
if len(p.list) == 0 && p.size == p.cap && !p.down && ctx.Err() == nil && ctx.Done() != nil {
poolCtx, cancel := context.WithCancelCause(ctx)
defer cancel(errAcquireComplete)
go func() {
<-poolCtx.Done()
if context.Cause(poolCtx) != errAcquireComplete { // no need to broadcast if the poolCtx is cancelled explicitly.
p.cond.Broadcast()
}
}()
}
retry:
for len(p.list) == 0 && p.size == p.cap && !p.down && ctx.Err() == nil {
p.cond.Wait()
}
if ctx.Err() != nil {
deadPipe := deadFn()
deadPipe.error.Store(&errs{error: ctx.Err()})
v = deadPipe
p.cond.L.Unlock()
return v
}
if p.down {
v = p.dead
p.cond.L.Unlock()
return v
}
if len(p.list) == 0 {
p.size++
// unlock before start to make a new wire
// allowing others to make wires concurrently instead of waiting in line
p.cond.L.Unlock()
v = p.make(ctx)
return v
}
i := len(p.list) - 1
v = p.list[i]
p.list[i] = nil
p.list = p.list[:i]
if v.Error() != nil {
p.size--
v.Close()
goto retry
}
p.cond.L.Unlock()
return v
}
func (p *pool) Store(v wire) {
p.cond.L.Lock()
if !p.down && v.Error() == nil {
p.list = append(p.list, v)
p.startTimerIfNeeded()
} else {
p.size--
v.Close()
}
p.cond.L.Unlock()
p.cond.Signal()
}
func (p *pool) Close() {
p.cond.L.Lock()
p.down = true
p.stopTimer()
for _, w := range p.list {
w.Close()
}
p.cond.L.Unlock()
p.cond.Broadcast()
}
func (p *pool) startTimerIfNeeded() {
if p.cleanup == 0 || p.timerOn || len(p.list) <= p.minSize {
return
}
p.timerOn = true
if p.timer == nil {
p.timer = time.AfterFunc(p.cleanup, p.removeIdleConns)
} else {
p.timer.Reset(p.cleanup)
}
}
func (p *pool) removeIdleConns() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
newLen := min(p.minSize, len(p.list))
for i, w := range p.list[newLen:] {
w.Close()
p.list[newLen+i] = nil
p.size--
}
p.list = p.list[:newLen]
p.timerOn = false
}
func (p *pool) stopTimer() {
p.timerOn = false
if p.timer != nil {
p.timer.Stop()
}
}