Skip to content

Commit 7291db1

Browse files
committed
feat(producer): add MaxBufferBytes to limit retry buffer size
Signed-off-by: Wenli Wan <[email protected]>
1 parent 1759595 commit 7291db1

File tree

3 files changed

+124
-62
lines changed

3 files changed

+124
-62
lines changed

async_producer.go

+41-17
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,16 @@ import (
1616
// ErrProducerRetryBufferOverflow is returned when the bridging retry buffer is full and OOM prevention needs to be applied.
1717
var ErrProducerRetryBufferOverflow = errors.New("retry buffer full: message discarded to prevent buffer overflow")
1818

19-
// minFunctionalRetryBufferLength is the lower limit of Producer.Retry.MaxBufferLength for it to function.
20-
// Any non-zero maxBufferLength but less than this lower limit is pushed to the lower limit.
21-
const minFunctionalRetryBufferLength = 4 * 1024
19+
const (
20+
// minFunctionalRetryBufferLength defines the minimum number of messages the retry buffer must support.
21+
// If Producer.Retry.MaxBufferLength is set to a non-zero value below this limit, it will be adjusted to this value.
22+
// This ensures the retry buffer remains functional under typical workloads.
23+
minFunctionalRetryBufferLength = 4 * 1024
24+
// minFunctionalRetryBufferBytes defines the minimum total byte size the retry buffer must support.
25+
// If Producer.Retry.MaxBufferBytes is set to a non-zero value below this limit, it will be adjusted to this value.
26+
// A 32 MB lower limit ensures sufficient capacity for retrying larger messages without exhausting resources.
27+
minFunctionalRetryBufferBytes = 32 * 1024 * 1024
28+
)
2229

2330
// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
2431
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
@@ -1214,11 +1221,22 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
12141221
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
12151222
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
12161223
func (p *asyncProducer) retryHandler() {
1217-
maxBufferSize := p.conf.Producer.Retry.MaxBufferLength
1218-
if 0 < maxBufferSize && maxBufferSize < minFunctionalRetryBufferLength {
1219-
maxBufferSize = minFunctionalRetryBufferLength
1224+
maxBufferLength := p.conf.Producer.Retry.MaxBufferLength
1225+
if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength {
1226+
maxBufferLength = minFunctionalRetryBufferLength
1227+
}
1228+
1229+
maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes
1230+
if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes {
1231+
maxBufferBytes = minFunctionalRetryBufferBytes
12201232
}
12211233

1234+
version := 1
1235+
if p.conf.Version.IsAtLeast(V0_11_0_0) {
1236+
version = 2
1237+
}
1238+
1239+
var currentByteSize int64
12221240
var msg *ProducerMessage
12231241
buf := queue.New()
12241242

@@ -1229,7 +1247,8 @@ func (p *asyncProducer) retryHandler() {
12291247
select {
12301248
case msg = <-p.retries:
12311249
case p.input <- buf.Peek().(*ProducerMessage):
1232-
buf.Remove()
1250+
msgToRemove := buf.Remove().(*ProducerMessage)
1251+
currentByteSize -= int64(msgToRemove.ByteSize(version))
12331252
continue
12341253
}
12351254
}
@@ -1239,17 +1258,22 @@ func (p *asyncProducer) retryHandler() {
12391258
}
12401259

12411260
buf.Add(msg)
1261+
currentByteSize += int64(msg.ByteSize(version))
12421262

1243-
if maxBufferSize > 0 && buf.Length() >= maxBufferSize {
1244-
msgToHandle := buf.Peek().(*ProducerMessage)
1245-
if msgToHandle.flags == 0 {
1246-
select {
1247-
case p.input <- msgToHandle:
1248-
buf.Remove()
1249-
default:
1250-
buf.Remove()
1251-
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
1252-
}
1263+
if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) {
1264+
continue
1265+
}
1266+
1267+
msgToHandle := buf.Peek().(*ProducerMessage)
1268+
if msgToHandle.flags == 0 {
1269+
select {
1270+
case p.input <- msgToHandle:
1271+
buf.Remove()
1272+
currentByteSize -= int64(msgToHandle.ByteSize(version))
1273+
default:
1274+
buf.Remove()
1275+
currentByteSize -= int64(msgToHandle.ByteSize(version))
1276+
p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
12531277
}
12541278
}
12551279
}

async_producer_test.go

+76-45
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ package sarama
44

55
import (
66
"errors"
7+
"github.com/stretchr/testify/assert"
78
"log"
89
"math"
910
"os"
1011
"os/signal"
1112
"strconv"
13+
"strings"
1214
"sync"
1315
"sync/atomic"
1416
"testing"
@@ -2176,7 +2178,7 @@ func TestTxnCanAbort(t *testing.T) {
21762178
require.NoError(t, err)
21772179
}
21782180

2179-
func TestPreventRetryBufferOverflow(t *testing.T) {
2181+
func TestProducerRetryBufferLimits(t *testing.T) {
21802182
broker := NewMockBroker(t, 1)
21812183
defer broker.Close()
21822184
topic := "test-topic"
@@ -2199,57 +2201,86 @@ func TestPreventRetryBufferOverflow(t *testing.T) {
21992201
"MetadataRequest": metadataRequestHandlerFunc,
22002202
})
22012203

2202-
config := NewTestConfig()
2203-
config.Producer.Flush.MaxMessages = 1
2204-
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
2205-
config.Producer.Return.Successes = true
2206-
2207-
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
2208-
if err != nil {
2209-
t.Fatal(err)
2204+
tests := []struct {
2205+
name string
2206+
configureBuffer func(*Config)
2207+
messageSize int
2208+
numMessages int
2209+
}{
2210+
{
2211+
name: "MaxBufferLength",
2212+
configureBuffer: func(config *Config) {
2213+
config.Producer.Flush.MaxMessages = 1
2214+
config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength
2215+
},
2216+
messageSize: 1, // Small message size
2217+
numMessages: 10000,
2218+
},
2219+
{
2220+
name: "MaxBufferBytes",
2221+
configureBuffer: func(config *Config) {
2222+
config.Producer.Flush.MaxMessages = 1
2223+
config.Producer.Retry.MaxBufferBytes = minFunctionalRetryBufferBytes
2224+
},
2225+
messageSize: 950 * 1024, // 950 KB
2226+
numMessages: 1000,
2227+
},
22102228
}
22112229

2212-
var (
2213-
wg sync.WaitGroup
2214-
successes, producerErrors int
2215-
errorFound bool
2216-
)
2217-
2218-
wg.Add(1)
2219-
go func() {
2220-
defer wg.Done()
2221-
for range producer.Successes() {
2222-
successes++
2223-
}
2224-
}()
2230+
for _, tt := range tests {
2231+
t.Run(tt.name, func(t *testing.T) {
2232+
config := NewTestConfig()
2233+
config.Producer.Return.Successes = true
2234+
tt.configureBuffer(config)
22252235

2226-
wg.Add(1)
2227-
go func() {
2228-
defer wg.Done()
2229-
for errMsg := range producer.Errors() {
2230-
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
2231-
errorFound = true
2236+
producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
2237+
if err != nil {
2238+
t.Fatal(err)
22322239
}
2233-
producerErrors++
2234-
}
2235-
}()
22362240

2237-
numMessages := 100000
2238-
for i := 0; i < numMessages; i++ {
2239-
kv := StringEncoder(strconv.Itoa(i))
2240-
producer.Input() <- &ProducerMessage{
2241-
Topic: topic,
2242-
Key: kv,
2243-
Value: kv,
2244-
Metadata: i,
2245-
}
2246-
}
2241+
var (
2242+
wg sync.WaitGroup
2243+
successes, producerErrors int
2244+
errorFound bool
2245+
)
2246+
2247+
wg.Add(1)
2248+
go func() {
2249+
defer wg.Done()
2250+
for range producer.Successes() {
2251+
successes++
2252+
}
2253+
}()
2254+
2255+
wg.Add(1)
2256+
go func() {
2257+
defer wg.Done()
2258+
for errMsg := range producer.Errors() {
2259+
if errors.Is(errMsg.Err, ErrProducerRetryBufferOverflow) {
2260+
errorFound = true
2261+
}
2262+
producerErrors++
2263+
}
2264+
}()
22472265

2248-
producer.AsyncClose()
2249-
wg.Wait()
2266+
longString := strings.Repeat("a", tt.messageSize)
2267+
val := StringEncoder(longString)
22502268

2251-
require.Equal(t, successes+producerErrors, numMessages, "Expected all messages to be processed")
2252-
require.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
2269+
for i := 0; i < tt.numMessages; i++ {
2270+
msg := &ProducerMessage{
2271+
Topic: topic,
2272+
Value: val,
2273+
}
2274+
producer.Input() <- msg
2275+
}
2276+
2277+
producer.AsyncClose()
2278+
wg.Wait()
2279+
2280+
assert.Equal(t, successes+producerErrors, tt.numMessages, "Expected all messages to be processed")
2281+
assert.True(t, errorFound, "Expected at least one error matching ErrProducerRetryBufferOverflow")
2282+
})
2283+
}
22532284
}
22542285

22552286
// This example shows how to use the producer while simultaneously

config.go

+7
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,13 @@ type Config struct {
276276
// Any value between 0 and 4096 is pushed to 4096.
277277
// A zero or negative value indicates unlimited.
278278
MaxBufferLength int
279+
// The maximum total byte size of messages in the bridging buffer between `input`
280+
// and `retries` channels in AsyncProducer#retryHandler.
281+
// This limit prevents the buffer from consuming excessive memory.
282+
// Defaults to 0 for unlimited.
283+
// Any value between 0 and 32 MB is pushed to 32 MB.
284+
// A zero or negative value indicates unlimited.
285+
MaxBufferBytes int64
279286
}
280287

281288
// Interceptors to be called when the producer dispatcher reads the

0 commit comments

Comments
 (0)