Skip to content

Commit b7aacc1

Browse files
authored
fix: correct algorithms for exponential and linear backoff and channel drain (#77)
1 parent c74bd35 commit b7aacc1

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

backoff.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@ type BackOff func(currentRetryCount int) time.Duration
1212
func ExponentialBackOff(minTimeout time.Duration) BackOff {
1313
return func(currentRetryCount int) time.Duration {
1414
jitter := rand.Float64()
15-
strategy := 1 << currentRetryCount
16-
backoff := (1 + float64(strategy)*jitter) * minTimeout.Seconds() * float64(time.Second)
15+
jitterMax := 200 * time.Millisecond
16+
if currentRetryCount < 1 {
17+
currentRetryCount = 1
18+
}
19+
strategy := 1 << (currentRetryCount - 1)
20+
backoff := (float64(strategy) * float64(minTimeout.Nanoseconds())) + (jitter * float64(jitterMax.Nanoseconds()))
1721

1822
return time.Duration(backoff)
1923
}
@@ -23,9 +27,11 @@ func ExponentialBackOff(minTimeout time.Duration) BackOff {
2327
func LinearBackOff(minTimeout time.Duration) BackOff {
2428
return func(currentRetryCount int) time.Duration {
2529
jitter := rand.Float64()
26-
strategy := float64(currentRetryCount)
27-
28-
backoff := (1 + strategy*jitter) * minTimeout.Seconds() * float64(time.Second)
30+
jitterMax := 200 * time.Millisecond
31+
if currentRetryCount < 1 {
32+
currentRetryCount = 1
33+
}
34+
backoff := (float64(currentRetryCount) * float64(minTimeout.Nanoseconds())) + (jitter * float64(jitterMax.Nanoseconds()))
2935
return time.Duration(backoff)
3036
}
3137
}

chans/chans.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,27 @@ func RangeBig(from, to *big.Int) <-chan *big.Int {
172172
return newCh
173173
}
174174

175+
// DrainOpen enumerates all items from the channel and discards them.
176+
// Returns number of items drained as soon as channel is empty.
177+
func DrainOpen[T any](channel <-chan T) int {
178+
drained := 0
179+
180+
for {
181+
select {
182+
case _, ok := <-channel:
183+
if ok {
184+
drained++
185+
} else {
186+
return drained
187+
}
188+
default:
189+
return drained
190+
}
191+
}
192+
}
193+
175194
// Drain enumerates all items from the channel and discards them.
176-
// Returns number of items drained.
195+
// Blocks until the channel is closed and returns number of items drained.
177196
func Drain[T any](channel <-chan T) int {
178197
drained := 0
179198
for range channel {

chans/chans_test.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func TestPushPop(t *testing.T) {
203203

204204
func TestDrain(t *testing.T) {
205205
c := make(chan string, 10)
206-
206+
207207
c <- "a"
208208
c <- "b"
209209
c <- "c"
@@ -214,3 +214,14 @@ func TestDrain(t *testing.T) {
214214
_, ok := <-c
215215
assert.False(t, ok)
216216
}
217+
218+
func TestDrainOpen(t *testing.T) {
219+
o := make(chan string, 10)
220+
221+
o <- "a"
222+
o <- "b"
223+
o <- "c"
224+
o <- "d"
225+
226+
assert.Equal(t, 4, chans.DrainOpen(o))
227+
}

0 commit comments

Comments
 (0)