Skip to content

Commit

Permalink
Topic validation is using a kafka producer per shard. (#82)
Browse files Browse the repository at this point in the history
* Topic validation is using a kafka producer per shard.
  • Loading branch information
onukristo authored Jan 16, 2024
1 parent 9e16673 commit ce078cf
Show file tree
Hide file tree
Showing 25 changed files with 329 additions and 87 deletions.
20 changes: 17 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ jobs:
IN_CI: true
MARIADB_TCP_3306: 3306
MARIADB_TCP_HOST: mysql1
TW_TKMS_KAFKA_TCP_9092: 9092
TW_TKMS_KAFKA_TCP_HOST: kafka1
TW_TKMS_KAFKA_1_TCP_9092: 9092
TW_TKMS_KAFKA_1_TCP_HOST: kafka1
TW_TKMS_KAFKA_2_TCP_9092: 9092
TW_TKMS_KAFKA_2_TCP_HOST: kafka2
ZOOKEEPER_TCP_2181: 2181
ZOOKEEPER_TCP_HOST: zookeeper1
POSTGRES_TCP_HOST: postgres1
Expand All @@ -54,7 +56,19 @@ jobs:
image: wurstmeister/kafka:2.12-2.2.0
env:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181/kafka1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: "10485760"
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 20000
KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true"
KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 5
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
kafka2:
image: wurstmeister/kafka:2.12-2.2.0
env:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181/kafka2
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: "10485760"
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.28.1] - 2024-01-12

### Fixed

- Topic validator is using a Kafka producer per shard.
This would ensure, that a right Kafka settings are used.
As different shards can have a different Kafka server behind them.

## [0.28.0] - 2023-12-20

### Changed
Expand Down
1 change: 0 additions & 1 deletion demoapp/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ spring:
locations: "classpath:db/migration/mysql"
tw-tkms:
polling-interval: 5ms
shards-count: 1
partitions-count: 1
topics:
- ComplexTest
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.0
version=0.28.1
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -80,50 +82,72 @@ public void afterPropertiesSet() {

@Override
public void preValidateAll() {
final var topics = tkmsProperties.getTopics();

final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization());
final var failures = new AtomicInteger();
final var countDownLatch = new CountDownLatch(topics.size());
final var phaser = new Phaser(1);
final var startTimeEpochMs = System.currentTimeMillis();

for (var topic : topics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {
/*
We are validating one by one, to get the proper error messages from Kafka.
And, we are doing it concurrently to speed things up.
*/
executor.execute(() -> {
try {
validate(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), topic, null);

log.info("Topic '{}' successfully pre-validated.", topic);
} catch (Throwable t) {
log.error("Topic validation for '" + topic + "' failed.", t);
failures.incrementAndGet();
} finally {
countDownLatch.countDown();
semaphore.release();
}
});
} else {
break;
for (int shard = 0; shard < tkmsProperties.getShardsCount(); shard++) {
var shardProperties = tkmsProperties.getShards().get(shard);
List<String> shardTopics = shardProperties == null ? null : shardProperties.getTopics();

if (shardTopics != null && shard == tkmsProperties.getDefaultShard()) {
throw new IllegalStateException("Topics for default shard have to be specified on 'tw-tkms.topics' property.");
}

if (shard == tkmsProperties.getDefaultShard()) {
shardTopics = tkmsProperties.getTopics();
}

if (shardTopics == null) {
continue;
}

for (var topic : shardTopics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {

final var finalShard = shard;
/*
We are validating one by one, to get the proper error messages from Kafka.
And, we are doing it concurrently to speed things up.
*/
phaser.register();
executor.execute(() -> {
try {
validate(TkmsShardPartition.of(finalShard, 0), topic, null);

log.info("Topic '{}' successfully pre-validated.", topic);
} catch (Throwable t) {
log.error("Topic validation for '" + topic + "' failed.", t);
failures.incrementAndGet();
} finally {
phaser.arriveAndDeregister();
semaphore.release();
}
});
} else {
break;
}
}
}

final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;

if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
int phase = phaser.arrive();
try {
phaser.awaitAdvanceInterruptibly(phase, timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
throw new IllegalStateException("Topic validation is taking too long.");
}

if (failures.get() > 0) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
throw new IllegalStateException("There were failures with topics validations. Refusing to start.");
}
}
Expand All @@ -133,7 +157,7 @@ public void validate(TkmsShardPartition shardPartition, String topic, Integer pa
if (tkmsProperties.getTopicValidation().isUseAdminClient()) {
validateUsingAdmin(shardPartition, topic, partition);
} else {
validateUsingProducer(topic);
validateUsingProducer(shardPartition, topic);
}
}

Expand All @@ -146,7 +170,10 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi
return Boolean.TRUE;
});

var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest().setTopic(topic));
var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest()
.setTopic(topic)
.setShardPartition(shardPartition)
);

if (response == null) {
throw new NullPointerException("Could not fetch topic description for topic '" + topic + "'.");
Expand Down Expand Up @@ -185,8 +212,8 @@ protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topi
Legacy logic.
We keep it in, in case some service would run into issues with the admin client based validation.
*/
protected void validateUsingProducer(String topic) {
tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic);
protected void validateUsingProducer(TkmsShardPartition shardPartition, String topic) {
tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation(shardPartition).partitionsFor(topic);
}

protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) {
Expand All @@ -196,16 +223,18 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript
&& result.getThrowable() instanceof UnknownTopicOrPartitionException
&& tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) {
final var topic = request.getTopic();
final var shardPartition = request.getShardPartition();

try {
validateUsingProducer(topic);
validateUsingProducer(shardPartition, topic);

log.info("Succeeded in auto creating topic `{}`", topic);

return fetchTopicDescription0(request);
} catch (Throwable t) {
log.warn("Trying to auto create topic `{}` failed.", topic, t);
// Close the producer, so it would not spam the metadata fetch failures forever.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(shardPartition);
}
}

Expand All @@ -219,9 +248,8 @@ protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescrip
Throwable throwable = null;

try {
topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin().describeTopics(Collections.singleton(topic),
new DescribeTopicsOptions().includeAuthorizedOperations(true))
.allTopicNames().get(30, TimeUnit.SECONDS).get(topic);
topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin(request.getShardPartition()).describeTopics(Collections.singleton(topic),
new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames().get(30, TimeUnit.SECONDS).get(topic);
} catch (Throwable t) {
if (t instanceof ExecutionException) {
throwable = t.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ protected int getPartition(int shard, TkmsMessage message) {
@Accessors(chain = true)
protected static class FetchTopicDescriptionRequest {

private TkmsShardPartition shardPartition;
private String topic;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import org.apache.kafka.clients.admin.Admin;

public interface ITkmsKafkaAdminProvider {

Admin getKafkaAdmin();
Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition);

void closeKafkaAdmin();
void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ public interface ITkmsKafkaProducerProvider {

KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation();
KafkaProducer<String, byte[]> getKafkaProducerForTopicValidation(TkmsShardPartition shardPartition);

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

void closeKafkaProducerForTopicValidation();
void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition);

void closeKafkaProducersForTopicValidation();

enum UseCase {
PROXY,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.TkmsProperties.ShardProperties;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
Expand All @@ -19,6 +22,7 @@
@Slf4j
public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, GracefulShutdownStrategy {

private static final Set<String> CONFIG_NAMES = AdminClientConfig.configNames();
/**
* Keep the kafka-clients' MBean registration happy.
*/
Expand All @@ -30,11 +34,11 @@ public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, Graceful
@Autowired
private MeterRegistry meterRegistry;

private Map<Long, AdminEntry> admins = new ConcurrentHashMap<>();
private Map<TkmsShardPartition, AdminEntry> admins = new ConcurrentHashMap<>();

@Override
public Admin getKafkaAdmin() {
return admins.computeIfAbsent(0L, key -> {
public Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition) {
return admins.computeIfAbsent(tkmsShardPartition, key -> {
var configs = new HashMap<String, Object>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'.");
Expand All @@ -43,13 +47,21 @@ public Admin getKafkaAdmin() {
configs.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
configs.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000);

var configNames = AdminClientConfig.configNames();
for (var e : tkmsProperties.getKafka().entrySet()) {
if (configNames.contains(e.getKey())) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}

ShardProperties shardProperties = tkmsProperties.getShards().get(tkmsShardPartition.getShard());
if (shardProperties != null) {
for (var e : shardProperties.getKafka().entrySet()) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}
}

final var admin = KafkaAdminClient.create(configs);
final var kafkaClientMetrics = new KafkaClientMetrics(admin);
kafkaClientMetrics.bindTo(meterRegistry);
Expand All @@ -59,8 +71,8 @@ public Admin getKafkaAdmin() {
}

@Override
public void closeKafkaAdmin() {
var adminEntry = admins.remove(0L);
public void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition) {
var adminEntry = admins.remove(tkmsShardPartition);

if (adminEntry == null) {
return;
Expand All @@ -77,7 +89,7 @@ public void closeKafkaAdmin() {

@Override
public void applicationTerminating() {
closeKafkaAdmin();
admins.keySet().forEach((this::closeKafkaAdmin));
}

@Override
Expand Down
Loading

0 comments on commit ce078cf

Please sign in to comment.