Skip to content
This repository was archived by the owner on Oct 7, 2022. It is now read-only.

Commit d5d3617

Browse files
committed
feat(consumer): add batch send deadline
1 parent 0194b1f commit d5d3617

File tree

5 files changed

+68
-37
lines changed

5 files changed

+68
-37
lines changed

Diff for: README.md

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ To create new injectors for your topics, you should create a new kubernetes depl
2929
- `ELASTICSEARCH_DISABLE_SNIFFING` if set to "true", the client will not sniff Elasticsearch nodes during the node discovery process. Defaults to false. **OPTIONAL**
3030
- `KAFKA_CONSUMER_CONCURRENCY` Number of parallel goroutines working as a consumer. Default value is 1 **OPTIONAL**
3131
- `KAFKA_CONSUMER_BATCH_SIZE` Number of records to accumulate before sending them to Elasticsearch (for each goroutine). Default value is 100 **OPTIONAL**
32+
- `KAFKA_CONSUMER_BATCH_DEADLINE` If no new records are added to the batch after this time duration, the batch will be sent to Elasticsearch. Default value is 1m **OPTIONAL**
3233
- `ES_INDEX_COLUMN` Record field to append to index name. Ex: to create one ES index per campaign, use "campaign_id" here **OPTIONAL**
3334
- `ES_BLACKLISTED_COLUMNS` Comma separated list of record fields to filter before sending to Elasticsearch. Defaults to empty string. **OPTIONAL**
3435
- `ES_DOC_ID_COLUMN` Record field to be the document ID of Elasticsearch. Defaults to "kafkaRecordPartition:kafkaRecordOffset". **OPTIONAL**

Diff for: cmd/injector.go

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
ConsumerGroup: os.Getenv("KAFKA_CONSUMER_GROUP"),
4343
Concurrency: os.Getenv("KAFKA_CONSUMER_CONCURRENCY"),
4444
BatchSize: os.Getenv("KAFKA_CONSUMER_BATCH_SIZE"),
45+
BatchDeadline: os.Getenv("KAFKA_CONSUMER_BATCH_DEADLINE"),
4546
BufferSize: os.Getenv("KAFKA_CONSUMER_BUFFER_SIZE"),
4647
MetricsUpdateInterval: os.Getenv("KAFKA_CONSUMER_METRICS_UPDATE_INTERVAL"),
4748
RecordType: os.Getenv("KAFKA_CONSUMER_RECORD_TYPE"),

Diff for: src/injector/injector.go

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
2222
level.Warn(logger).Log("err", err, "message", "failed to get consumer batch size")
2323
batchSize = 100
2424
}
25+
batchDeadline, err := time.ParseDuration(kafkaConfig.BatchDeadline)
26+
if err != nil {
27+
level.Warn(logger).Log("err", err, "message", "failed to get consumer batch deadline")
28+
batchDeadline = time.Minute
29+
}
2530
metricsUpdateInterval, err := time.ParseDuration(kafkaConfig.MetricsUpdateInterval)
2631
if err != nil {
2732
level.Warn(logger).Log("err", err, "message", "failed to get consumer metrics update interval")
@@ -54,6 +59,7 @@ func MakeKafkaConsumer(endpoints Endpoints, logger log.Logger, schemaRegistry *s
5459
Logger: logger,
5560
Concurrency: concurrency,
5661
BatchSize: batchSize,
62+
BatchDeadline: batchDeadline,
5763
MetricsUpdateInterval: metricsUpdateInterval,
5864
BufferSize: bufferSize,
5965
IncludeKey: includeKey,

Diff for: src/kafka/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type Config struct {
1010
ConsumerGroup string
1111
Concurrency string
1212
BatchSize string
13+
BatchDeadline string
1314
MetricsUpdateInterval string
1415
BufferSize string
1516
RecordType string

Diff for: src/kafka/consumer.go

+59-37
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package kafka
22

33
import (
44
"context"
5-
"os"
65
"errors"
6+
"os"
77

88
"time"
99

@@ -12,9 +12,9 @@ import (
1212
"github.com/go-kit/kit/endpoint"
1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
15+
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1516
"github.com/inloco/kafka-elasticsearch-injector/src/metrics"
1617
"github.com/inloco/kafka-elasticsearch-injector/src/models"
17-
e "github.com/inloco/kafka-elasticsearch-injector/src/errors"
1818
)
1919

2020
type Notification int32
@@ -41,6 +41,7 @@ type Consumer struct {
4141
Logger log.Logger
4242
Concurrency int
4343
BatchSize int
44+
BatchDeadline time.Duration
4445
MetricsUpdateInterval time.Duration
4546
BufferSize int
4647
IncludeKey bool
@@ -80,8 +81,9 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
8081
defer consumer.Close()
8182

8283
buffSize := k.consumer.BatchSize
84+
batchDeadline := k.consumer.BatchDeadline
8385
for i := 0; i < concurrency; i++ {
84-
go k.worker(consumer, buffSize, notifications)
86+
go k.worker(consumer, buffSize, batchDeadline, notifications)
8587
}
8688
go func() {
8789
for {
@@ -134,45 +136,65 @@ func (k *kafka) Start(signals chan os.Signal, notifications chan<- Notification)
134136
}
135137
}
136138

137-
func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, notifications chan<- Notification) {
139+
func (k *kafka) decodeMessages(buf []*sarama.ConsumerMessage, bufIdx int) []*models.Record {
140+
decoded := make([]*models.Record, 0)
141+
for i := 0; i < bufIdx; i++ {
142+
req, err := k.consumer.Decoder(nil, buf[i], k.consumer.IncludeKey)
143+
if err != nil {
144+
if errors.Is(err, e.ErrNilMessage) {
145+
continue
146+
}
147+
148+
level.Error(k.consumer.Logger).Log(
149+
"message", "Error decoding message",
150+
"err", err.Error(),
151+
)
152+
continue
153+
}
154+
decoded = append(decoded, req)
155+
}
156+
157+
return decoded
158+
}
159+
160+
func (k *kafka) flushMessages(buf []*sarama.ConsumerMessage, bufIdx int, consumer *cluster.Consumer, notifications chan<- Notification) {
161+
records := k.decodeMessages(buf, bufIdx)
162+
for {
163+
if res, err := k.consumer.Endpoint(context.Background(), records); err != nil {
164+
level.Error(k.consumer.Logger).Log("message", "error on endpoint call", "err", err.Error())
165+
var _ = res // ignore res (for now)
166+
continue
167+
}
168+
break
169+
}
170+
171+
notifications <- Inserted
172+
k.metricsPublisher.IncrementRecordsConsumed(len(records))
173+
for i := 0; i < bufIdx; i++ {
174+
k.offsetCh <- &topicPartitionOffset{buf[i].Topic, buf[i].Partition, buf[i].Offset}
175+
consumer.MarkOffset(buf[i], "") // mark message as processed
176+
}
177+
}
178+
179+
func (k *kafka) worker(consumer *cluster.Consumer, buffSize int, batchDeadline time.Duration, notifications chan<- Notification) {
138180
buf := make([]*sarama.ConsumerMessage, buffSize)
139-
var decoded []*models.Record
181+
var lastReceivedMsg time.Time
140182
idx := 0
141183
for {
142-
kafkaMsg := <-k.consumerCh
143-
buf[idx] = kafkaMsg
144-
idx++
145-
for idx == buffSize {
146-
if decoded == nil {
147-
for _, msg := range buf {
148-
req, err := k.consumer.Decoder(nil, msg, k.consumer.IncludeKey)
149-
if err != nil {
150-
if errors.Is(err, e.ErrNilMessage) {
151-
continue
152-
}
153-
154-
level.Error(k.consumer.Logger).Log(
155-
"message", "Error decoding message",
156-
"err", err.Error(),
157-
)
158-
continue
159-
}
160-
decoded = append(decoded, req)
161-
}
162-
}
163-
if res, err := k.consumer.Endpoint(context.Background(), decoded); err != nil {
164-
level.Error(k.consumer.Logger).Log("message", "error on endpoint call", "err", err.Error())
165-
var _ = res // ignore res (for now)
166-
continue
184+
select {
185+
case kafkaMsg := <-k.consumerCh:
186+
lastReceivedMsg = time.Now()
187+
buf[idx] = kafkaMsg
188+
idx++
189+
if idx == buffSize {
190+
k.flushMessages(buf, idx, consumer, notifications)
191+
idx = 0
167192
}
168-
notifications <- Inserted
169-
k.metricsPublisher.IncrementRecordsConsumed(buffSize)
170-
for _, msg := range buf {
171-
k.offsetCh <- &topicPartitionOffset{msg.Topic, msg.Partition, msg.Offset}
172-
consumer.MarkOffset(msg, "") // mark message as processed
193+
default:
194+
if idx > 0 && time.Since(lastReceivedMsg) > batchDeadline {
195+
k.flushMessages(buf, idx, consumer, notifications)
196+
idx = 0
173197
}
174-
decoded = nil
175-
idx = 0
176198
}
177199
}
178200
}

0 commit comments

Comments
 (0)