diff --git a/transport.go b/transport.go index 52e067e47..ebe82e17c 100644 --- a/transport.go +++ b/transport.go @@ -344,14 +344,32 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) switch m := req.(type) { case *meta.Request: - // We serve metadata requests directly from the transport cache. + // We serve metadata requests directly from the transport cache unless + // we would like to auto create a topic that isn't in our cache. // // This reduces the number of round trips to kafka brokers while keeping // the logic simple when applying partitioning strategies. if state.err != nil { return nil, state.err } - return filterMetadataResponse(m, state.metadata), nil + + cachedMeta := filterMetadataResponse(m, state.metadata) + // requestNeeded indicates if we need to send this metadata request to the server. + // It's true when we want to auto-create topics and we don't have the topic in our + // cache. + var requestNeeded bool + if m.AllowAutoTopicCreation { + for _, topic := range cachedMeta.Topics { + if topic.ErrorCode == int16(UnknownTopicOrPartition) { + requestNeeded = true + break + } + } + } + + if !requestNeeded { + return cachedMeta, nil + } case protocol.Splitter: // Messages that implement the Splitter interface trigger the creation of @@ -392,6 +410,14 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) } p.refreshMetadata(ctx, topicsToRefresh) + case *meta.Response: + m := req.(*meta.Request) + // If we get here with allow auto topic creation then + // we didn't have that topic in our cache so we should update + // the cache. + if m.AllowAutoTopicCreation { + p.refreshMetadata(ctx, m.TopicNames) + } } return r, nil diff --git a/writer.go b/writer.go index 5d44f75c7..8fc34c4f0 100644 --- a/writer.go +++ b/writer.go @@ -702,7 +702,8 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) { // It is expected that the transport will optimize this request by // caching recent results (the kafka.Transport types does). r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{ - TopicNames: []string{topic}, + TopicNames: []string{topic}, + AllowAutoTopicCreation: true, }) if err != nil { return 0, err diff --git a/writer_test.go b/writer_test.go index 6994c1a34..8737809f8 100644 --- a/writer_test.go +++ b/writer_test.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "errors" "fmt" "io" "math" @@ -155,6 +156,10 @@ func TestWriter(t *testing.T) { scenario: "writing a message to an invalid partition", function: testWriterInvalidPartition, }, + { + scenario: "writing a message to a non-existant topic creates the topic", + function: testWriterAutoCreateTopic, + }, } for _, test := range tests { @@ -698,6 +703,40 @@ func testWriterUnexpectedMessageTopic(t *testing.T) { } } +func testWriterAutoCreateTopic(t *testing.T) { + topic := makeTopic() + // Assume it's going to get created. + defer deleteTopic(t, topic) + + w := newTestWriter(WriterConfig{ + Topic: topic, + Balancer: &RoundRobin{}, + }) + defer w.Close() + + msg := Message{Key: []byte("key"), Value: []byte("Hello World")} + + var err error + const retries = 5 + for i := 0; i < retries; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err = w.WriteMessages(ctx, msg) + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + time.Sleep(time.Millisecond * 250) + continue + } + + if err != nil { + t.Errorf("unexpected error %v", err) + return + } + } + if err != nil { + t.Errorf("unable to create topic %v", err) + } +} + type staticBalancer struct { partition int }