From cf40a01d38fce6a3233e8acca393c638e6640416 Mon Sep 17 00:00:00 2001 From: Artur Shamsutdinov Date: Sat, 29 Oct 2022 04:27:35 +1100 Subject: [PATCH] Add WriteBackoffMin/Max config to Writer (#1015) * Added Writer config to configure min and max delay between reties. No changes in the logic or in default values. * Empty commit * Update writer.go Co-authored-by: Achille --- writer.go | 86 ++++++++++++++++++++++++++++++++++---------------- writer_test.go | 17 ++++++++++ 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/writer.go b/writer.go index 47ee439a8..0cfbf9558 100644 --- a/writer.go +++ b/writer.go @@ -113,6 +113,18 @@ type Writer struct { // The default is to try at most 10 times. MaxAttempts int + // WriteBackoffMin optionally sets the smallest amount of time the writer waits before + // it attempts to write a batch of messages + // + // Default: 100ms + WriteBackoffMin time.Duration + + // WriteBackoffMax optionally sets the maximum amount of time the writer waits before + // it attempts to write a batch of messages + // + // Default: 1s + WriteBackoffMax time.Duration + // Limit on how many messages will be buffered before being sent to a // partition. // @@ -360,13 +372,15 @@ type WriterStats struct { BatchSize SummaryStats `metric:"kafka.writer.batch.size"` BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"` - MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` - MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"` - BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"` - ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"` - WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"` - RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"` - Async bool `metric:"kafka.writer.async" type:"gauge"` + MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"` + WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"` + WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"` + MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"` + BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"` + ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"` + WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"` + RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"` + Async bool `metric:"kafka.writer.async" type:"gauge"` Topic string `tag:"topic"` @@ -759,6 +773,20 @@ func (w *Writer) maxAttempts() int { return 10 } +func (w *Writer) writeBackoffMin() time.Duration { + if w.WriteBackoffMin > 0 { + return w.WriteBackoffMin + } + return 100 * time.Millisecond +} + +func (w *Writer) writeBackoffMax() time.Duration { + if w.WriteBackoffMax > 0 { + return w.WriteBackoffMax + } + return 1 * time.Second +} + func (w *Writer) batchSize() int { if w.BatchSize > 0 { return w.BatchSize @@ -829,26 +857,28 @@ func (w *Writer) stats() *writerStats { func (w *Writer) Stats() WriterStats { stats := w.stats() return WriterStats{ - Dials: stats.dials.snapshot(), - Writes: stats.writes.snapshot(), - Messages: stats.messages.snapshot(), - Bytes: stats.bytes.snapshot(), - Errors: stats.errors.snapshot(), - DialTime: stats.dialTime.snapshotDuration(), - BatchTime: stats.batchTime.snapshotDuration(), - WriteTime: stats.writeTime.snapshotDuration(), - WaitTime: stats.waitTime.snapshotDuration(), - Retries: stats.retries.snapshot(), - BatchSize: stats.batchSize.snapshot(), - BatchBytes: stats.batchSizeBytes.snapshot(), - MaxAttempts: int64(w.MaxAttempts), - MaxBatchSize: int64(w.BatchSize), - BatchTimeout: w.BatchTimeout, - ReadTimeout: w.ReadTimeout, - WriteTimeout: w.WriteTimeout, - RequiredAcks: int64(w.RequiredAcks), - Async: w.Async, - Topic: w.Topic, + Dials: stats.dials.snapshot(), + Writes: stats.writes.snapshot(), + Messages: stats.messages.snapshot(), + Bytes: stats.bytes.snapshot(), + Errors: stats.errors.snapshot(), + DialTime: stats.dialTime.snapshotDuration(), + BatchTime: stats.batchTime.snapshotDuration(), + WriteTime: stats.writeTime.snapshotDuration(), + WaitTime: stats.waitTime.snapshotDuration(), + Retries: stats.retries.snapshot(), + BatchSize: stats.batchSize.snapshot(), + BatchBytes: stats.batchSizeBytes.snapshot(), + MaxAttempts: int64(w.MaxAttempts), + WriteBackoffMin: w.WriteBackoffMin, + WriteBackoffMax: w.WriteBackoffMax, + MaxBatchSize: int64(w.BatchSize), + BatchTimeout: w.BatchTimeout, + ReadTimeout: w.ReadTimeout, + WriteTimeout: w.WriteTimeout, + RequiredAcks: int64(w.RequiredAcks), + Async: w.Async, + Topic: w.Topic, } } @@ -1066,7 +1096,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) { // guarantees to abort, but may be better to avoid long wait times // on close. // - delay := backoff(attempt, 100*time.Millisecond, 1*time.Second) + delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax()) ptw.w.withLogger(func(log Logger) { log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition) }) diff --git a/writer_test.go b/writer_test.go index 04d012079..988ca59e2 100644 --- a/writer_test.go +++ b/writer_test.go @@ -170,6 +170,10 @@ func TestWriter(t *testing.T) { scenario: "writing a message with SASL Plain authentication", function: testWriterSasl, }, + { + scenario: "test default configuration values", + function: testWriterDefaults, + }, } for _, test := range tests { @@ -818,6 +822,19 @@ func testWriterSasl(t *testing.T) { } } +func testWriterDefaults(t *testing.T) { + w := &Writer{} + defer w.Close() + + if w.writeBackoffMin() != 100*time.Millisecond { + t.Error("Incorrect default min write backoff delay") + } + + if w.writeBackoffMax() != 1*time.Second { + t.Error("Incorrect default max write backoff delay") + } +} + type staticBalancer struct { partition int }