Skip to content

Commit 756051e

Browse files
committed
feat: exponential backoff for clients
Signed-off-by: Wenli Wan <[email protected]>
1 parent 9ae475a commit 756051e

File tree

3 files changed

+161
-2
lines changed

3 files changed

+161
-2
lines changed

async_producer_test.go

+67-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package sarama
44

55
import (
66
"errors"
7-
"github.com/stretchr/testify/assert"
87
"log"
98
"math"
109
"os"
@@ -16,6 +15,8 @@ import (
1615
"testing"
1716
"time"
1817

18+
"github.com/stretchr/testify/assert"
19+
1920
"github.com/fortytw2/leaktest"
2021
"github.com/rcrowley/go-metrics"
2122
"github.com/stretchr/testify/require"
@@ -638,6 +639,71 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
638639
}
639640
}
640641

642+
func TestAsyncProducerWithExponentialBackoffDurations(t *testing.T) {
643+
var backoffDurations []time.Duration
644+
var mu sync.Mutex
645+
646+
topic := "my_topic"
647+
maxBackoff := 2 * time.Second
648+
config := NewTestConfig()
649+
650+
initBackoff := 100 * time.Millisecond
651+
innerBackoffFunc := NewExponentialBackoff(initBackoff, maxBackoff)
652+
backoffFunc := func(retries, maxRetries int) time.Duration {
653+
duration := innerBackoffFunc(retries, maxRetries)
654+
mu.Lock()
655+
backoffDurations = append(backoffDurations, duration)
656+
mu.Unlock()
657+
return duration
658+
}
659+
660+
config.Producer.Flush.Messages = 5
661+
config.Producer.Return.Successes = true
662+
config.Producer.Retry.Max = 3
663+
config.Producer.Retry.BackoffFunc = backoffFunc
664+
665+
broker := NewMockBroker(t, 1)
666+
667+
metadataResponse := new(MetadataResponse)
668+
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
669+
metadataResponse.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
670+
broker.Returns(metadataResponse)
671+
672+
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
673+
if err != nil {
674+
t.Fatal(err)
675+
}
676+
677+
failResponse := new(ProduceResponse)
678+
failResponse.AddTopicPartition(topic, 0, ErrNotLeaderForPartition)
679+
successResponse := new(ProduceResponse)
680+
successResponse.AddTopicPartition(topic, 0, ErrNoError)
681+
682+
broker.Returns(failResponse)
683+
broker.Returns(metadataResponse)
684+
broker.Returns(failResponse)
685+
broker.Returns(metadataResponse)
686+
broker.Returns(successResponse)
687+
688+
for i := 0; i < 5; i++ {
689+
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("test")}
690+
}
691+
692+
expectResults(t, producer, 5, 0)
693+
closeProducer(t, producer)
694+
broker.Close()
695+
696+
assert.Greater(t, backoffDurations[0], time.Duration(0), "Expected first backoff duration to be greater than 0")
697+
698+
for i := 1; i < len(backoffDurations); i++ {
699+
assert.Greater(t, backoffDurations[i], time.Duration(0), "Expected backoff[%d] to be greater than 0", i)
700+
assert.GreaterOrEqual(t, backoffDurations[i], backoffDurations[i-1],
701+
"Expected backoff[%d] >= backoff[%d], but got %v < %v", i, i-1, backoffDurations[i], backoffDurations[i-1])
702+
assert.LessOrEqual(t, backoffDurations[i], maxBackoff,
703+
"Expected backoff[%d] <= maxBackoff, but got %v > %v", i, backoffDurations[i], maxBackoff)
704+
}
705+
}
706+
641707
// https://github.com/IBM/sarama/issues/2129
642708
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
643709
// Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

utils.go

+43
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,15 @@ package sarama
33
import (
44
"bufio"
55
"fmt"
6+
"math/rand"
67
"net"
78
"regexp"
9+
"time"
10+
)
11+
12+
const (
13+
defaultRetryBackoff = 100 * time.Millisecond
14+
defaultRetryMaxBackoff = 1000 * time.Millisecond
815
)
916

1017
type none struct{}
@@ -344,3 +351,39 @@ func (v KafkaVersion) String() string {
344351

345352
return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
346353
}
354+
355+
// NewExponentialBackoff returns a function that implements an exponential backoff strategy with jitter.
356+
// It follows KIP-580, implementing the formula:
357+
// MIN(retry.backoff.max.ms, (retry.backoff.ms * 2**(failures - 1)) * random(0.8, 1.2))
358+
// This ensures retries start with `backoff` and exponentially increase until `maxBackoff`, with added jitter.
359+
// The behavior when `failures = 0` is not explicitly defined in KIP-580 and is left to implementation discretion.
360+
//
361+
// Example usage:
362+
//
363+
// backoffFunc := sarama.NewExponentialBackoff(config.Producer.Retry.Backoff, 2*time.Second)
364+
// config.Producer.Retry.BackoffFunc = backoffFunc
365+
func NewExponentialBackoff(backoff time.Duration, maxBackoff time.Duration) func(retries, maxRetries int) time.Duration {
366+
if backoff <= 0 {
367+
backoff = defaultRetryBackoff
368+
}
369+
if maxBackoff <= 0 {
370+
maxBackoff = defaultRetryMaxBackoff
371+
}
372+
373+
if backoff > maxBackoff {
374+
Logger.Println("Warning: backoff is greater than maxBackoff, using maxBackoff instead.")
375+
backoff = maxBackoff
376+
}
377+
378+
return func(retries, maxRetries int) time.Duration {
379+
if retries <= 0 {
380+
return backoff
381+
}
382+
383+
calculatedBackoff := backoff * time.Duration(1<<(retries-1))
384+
jitter := 0.8 + 0.4*rand.Float64()
385+
calculatedBackoff = time.Duration(float64(calculatedBackoff) * jitter)
386+
387+
return min(calculatedBackoff, maxBackoff)
388+
}
389+
}

utils_test.go

+51-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
package sarama
44

5-
import "testing"
5+
import (
6+
"testing"
7+
"time"
8+
)
69

710
func TestVersionCompare(t *testing.T) {
811
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
@@ -95,3 +98,50 @@ func TestVersionParsing(t *testing.T) {
9598
}
9699
}
97100
}
101+
102+
func TestExponentialBackoffValidCases(t *testing.T) {
103+
testCases := []struct {
104+
backoff time.Duration
105+
maxBackoff time.Duration
106+
retries int
107+
maxRetries int
108+
minBackoff time.Duration
109+
maxBackoffExpected time.Duration
110+
}{
111+
{100 * time.Millisecond, 2 * time.Second, 1, 5, 80 * time.Millisecond, 120 * time.Millisecond},
112+
{100 * time.Millisecond, 2 * time.Second, 3, 5, 320 * time.Millisecond, 480 * time.Millisecond},
113+
{100 * time.Millisecond, 2 * time.Second, 5, 5, 1280 * time.Millisecond, 1920 * time.Millisecond},
114+
}
115+
116+
for _, tc := range testCases {
117+
backoffFunc := NewExponentialBackoff(tc.backoff, tc.maxBackoff)
118+
backoff := backoffFunc(tc.retries, tc.maxRetries)
119+
if backoff < tc.minBackoff || backoff > tc.maxBackoffExpected {
120+
t.Errorf("backoff(%d, %d): expected between %v and %v, got %v", tc.retries, tc.maxRetries, tc.minBackoff, tc.maxBackoffExpected, backoff)
121+
}
122+
}
123+
}
124+
125+
func TestExponentialBackoffDefaults(t *testing.T) {
126+
testCases := []struct {
127+
backoff time.Duration
128+
maxBackoff time.Duration
129+
expectedBackoff time.Duration
130+
expectedMaxBackoff time.Duration
131+
}{
132+
{-100 * time.Millisecond, 2 * time.Second, defaultRetryBackoff, 2 * time.Second},
133+
{100 * time.Millisecond, -2 * time.Second, 100 * time.Millisecond, defaultRetryMaxBackoff},
134+
{-100 * time.Millisecond, -2 * time.Second, defaultRetryBackoff, defaultRetryMaxBackoff},
135+
{0 * time.Millisecond, 2 * time.Second, defaultRetryBackoff, 2 * time.Second},
136+
{100 * time.Millisecond, 0 * time.Second, 100 * time.Millisecond, defaultRetryMaxBackoff},
137+
{0 * time.Millisecond, 0 * time.Second, defaultRetryBackoff, defaultRetryMaxBackoff},
138+
}
139+
140+
for _, tc := range testCases {
141+
backoffFunc := NewExponentialBackoff(tc.backoff, tc.maxBackoff)
142+
backoff := backoffFunc(2, 5)
143+
if backoff < tc.expectedBackoff || backoff > tc.expectedMaxBackoff {
144+
t.Errorf("backoff(%v, %v): expected between %v and %v, got %v", tc.backoff, tc.maxBackoff, tc.expectedBackoff, tc.expectedMaxBackoff, backoff)
145+
}
146+
}
147+
}

0 commit comments

Comments
 (0)