diff --git a/README.md b/README.md index 33e452b38..d08e67f0f 100644 --- a/README.md +++ b/README.md @@ -337,6 +337,7 @@ to use in most cases as it provides additional features: - Synchronous or asynchronous writes of messages to Kafka. - Asynchronous cancellation using contexts. - Flushing of pending messages on close to support graceful shutdowns. +- Creation of a missing topic before publishing a message. *Note!* it was the default behaviour up to the version `v0.4.30`. ```go // make a writer that produces to topic-A, using the least-bytes distribution @@ -369,6 +370,55 @@ if err := w.Close(); err != nil { } ``` +### Missing topic creation before publication + +```go +// Make a writer that publishes messages to topic-A. +// The topic will be created if it is missing. +w := &Writer{ + Addr: TCP("localhost:9092"), + Topic: "topic-A", + AllowAutoTopicCreation: true, +} + +messages := []kafka.Message{ + { + Key: []byte("Key-A"), + Value: []byte("Hello World!"), + }, + { + Key: []byte("Key-B"), + Value: []byte("One!"), + }, + { + Key: []byte("Key-C"), + Value: []byte("Two!"), + }, +} + +var err error +const retries = 3 +for i := 0; i < retries; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // attempt to create topic prior to publishing the message + err = w.WriteMessages(ctx, messages...) + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + time.Sleep(time.Millisecond * 250) + continue + } + + if err != nil { + log.Fatalf("unexpected error %v", err) + } +} + +if err := w.Close(); err != nil { + log.Fatal("failed to close writer:", err) +} +``` + ### Writing to multiple topics Normally, the `WriterConfig.Topic` is used to initialize a single-topic writer. diff --git a/writer.go b/writer.go index cdbe07129..89ac87192 100644 --- a/writer.go +++ b/writer.go @@ -185,6 +185,9 @@ type Writer struct { // If nil, DefaultTransport is used. Transport RoundTripper + // AllowAutoTopicCreation notifies writer to create topic is missing. + AllowAutoTopicCreation bool + // Manages the current set of partition-topic writers. group sync.WaitGroup mutex sync.Mutex @@ -733,7 +736,7 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { // caching recent results (the kafka.Transport types does). r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{ TopicNames: []string{topic}, - AllowAutoTopicCreation: true, + AllowAutoTopicCreation: w.AllowAutoTopicCreation, }) if err != nil { return 0, err @@ -941,7 +944,7 @@ func newBatchQueue(initialSize int) batchQueue { bq := batchQueue{ queue: make([]*writeBatch, 0, initialSize), mutex: &sync.Mutex{}, - cond: &sync.Cond{}, + cond: &sync.Cond{}, } bq.cond.L = bq.mutex diff --git a/writer_test.go b/writer_test.go index 8737809f8..da95423fa 100644 --- a/writer_test.go +++ b/writer_test.go @@ -160,6 +160,10 @@ func TestWriter(t *testing.T) { scenario: "writing a message to a non-existant topic creates the topic", function: testWriterAutoCreateTopic, }, + { + scenario: "terminates on an attempt to write a message to a nonexistent topic", + function: testWriterTerminateMissingTopic, + }, } for _, test := range tests { @@ -712,6 +716,7 @@ func testWriterAutoCreateTopic(t *testing.T) { Topic: topic, Balancer: &RoundRobin{}, }) + w.AllowAutoTopicCreation = true defer w.Close() msg := Message{Key: []byte("key"), Value: []byte("Hello World")} @@ -737,6 +742,30 @@ func testWriterAutoCreateTopic(t *testing.T) { } } +func testWriterTerminateMissingTopic(t *testing.T) { + topic := makeTopic() + + transport := &Transport{} + defer transport.CloseIdleConnections() + + writer := &Writer{ + Addr: TCP("localhost:9092"), + Topic: topic, + Balancer: &RoundRobin{}, + RequiredAcks: RequireNone, + AllowAutoTopicCreation: false, + Transport: transport, + } + defer writer.Close() + + msg := Message{Value: []byte("FooBar")} + + if err := writer.WriteMessages(context.Background(), msg); err == nil { + t.Fatal("Kafka error [3] UNKNOWN_TOPIC_OR_PARTITION is expected") + return + } +} + type staticBalancer struct { partition int }