From da91759e946436612f6e31af58971162bf1d058d Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Fri, 3 Jun 2022 09:24:23 -0700 Subject: [PATCH] Reader: allow config to return OffsetOutOfRange errors (#917) --- go.mod | 4 +++- go.sum | 10 ++++++---- kafka_test.go | 6 +++++- reader.go | 17 +++++++++++++++++ reader_test.go | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index dffdfa9d4..5a3c6ae97 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/segmentio/kafka-go go 1.15 require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/klauspost/compress v1.14.2 github.com/pierrec/lz4/v4 v4.1.14 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.7.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 + gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 // indirect ) diff --git a/go.sum b/go.sum index 25361b443..06198a1b9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= @@ -7,8 +8,8 @@ github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= @@ -24,5 +25,6 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99 h1:dbuHpmKjkDzSOMKAWl10QNlgaZUd3V1q99xc81tt2Kc= +gopkg.in/yaml.v3 v3.0.0-20220512140231-539c8e751b99/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka_test.go b/kafka_test.go index a82e844aa..4c6af706c 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -183,5 +183,9 @@ func newTestKafkaLogger(t *testing.T, prefix string) Logger { func (l *testKafkaLogger) Printf(msg string, args ...interface{}) { l.T.Helper() - l.T.Logf(l.Prefix+" "+msg, args...) + if l.Prefix != "" { + l.T.Logf(l.Prefix+" "+msg, args...) + } else { + l.T.Logf(msg, args...) + } } diff --git a/reader.go b/reader.go index e24f134ff..c83c2389f 100644 --- a/reader.go +++ b/reader.go @@ -509,6 +509,12 @@ type ReaderConfig struct { // // The default is to try 3 times. MaxAttempts int + + // OffsetOutOfRangeError indicates that the reader should return an error in + // the event of an OffsetOutOfRange error, rather than retrying indefinitely. + // This flag is being added to retain backwards-compatibility, so it will be + // removed in a future version of kafka-go. + OffsetOutOfRangeError bool } // Validate method validates ReaderConfig properties. @@ -1191,6 +1197,9 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { stats: r.stats, isolationLevel: r.config.IsolationLevel, maxAttempts: r.config.MaxAttempts, + + // backwards-compatibility flags + offsetOutOfRangeError: r.config.OffsetOutOfRangeError, }).run(ctx, offset) }(ctx, key, offset, &r.join) } @@ -1216,6 +1225,8 @@ type reader struct { stats *readerStats isolationLevel IsolationLevel maxAttempts int + + offsetOutOfRangeError bool } type readerMessage struct { @@ -1249,12 +1260,18 @@ func (r *reader) run(ctx context.Context, offset int64) { conn, start, err := r.initialize(ctx, offset) if err != nil { if errors.Is(err, OffsetOutOfRange) { + if r.offsetOutOfRangeError { + r.sendError(ctx, err) + return + } + // This would happen if the requested offset is passed the last // offset on the partition leader. In that case we're just going // to retry later hoping that enough data has been produced. r.withErrorLogger(func(log Logger) { log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err) }) + continue } diff --git a/reader_test.go b/reader_test.go index 266cf893d..d73bdfbe3 100644 --- a/reader_test.go +++ b/reader_test.go @@ -62,6 +62,11 @@ func TestReader(t *testing.T) { scenario: "reading from an out-of-range offset waits until the context is cancelled", function: testReaderOutOfRangeGetsCanceled, }, + + { + scenario: "topic being recreated will return an error", + function: testReaderTopicRecreated, + }, } for _, test := range tests { @@ -78,6 +83,7 @@ func TestReader(t *testing.T) { MinBytes: 1, MaxBytes: 10e6, MaxWait: 100 * time.Millisecond, + Logger: newTestKafkaLogger(t, ""), }) defer r.Close() testFunc(t, ctx, r) @@ -1950,3 +1956,29 @@ func createTopicWithCompaction(t *testing.T, topic string, partitions int) { defer cancel() waitForTopic(ctx, t, topic) } + +// The current behavior of the Reader is to retry OffsetOutOfRange errors +// indefinitely, which results in programs hanging in the event of a topic being +// re-created while a consumer is running. To retain backwards-compatibility, +// ReaderConfig.OffsetOutOfRangeError is being used to instruct the Reader to +// return an error in this case instead, allowing callers to react. +func testReaderTopicRecreated(t *testing.T, ctx context.Context, r *Reader) { + r.config.OffsetOutOfRangeError = true + + topic := r.config.Topic + + // add 1 message to the topic + prepareReader(t, ctx, r, makeTestSequence(1)...) + + // consume the message (moving the offset from 0 -> 1) + _, err := r.ReadMessage(ctx) + require.NoError(t, err) + + // destroy the topic, then recreate it so the offset now becomes 0 + deleteTopic(t, topic) + createTopic(t, topic, 1) + + // expect an error, since the offset should now be out of range + _, err = r.ReadMessage(ctx) + require.ErrorIs(t, err, OffsetOutOfRange) +}