Skip to content

Commit

Permalink
Topics validation can be done via Kafka Admin Client. (#79)
Browse files Browse the repository at this point in the history
Topics validation can be done via Kafka Admin Client.
  • Loading branch information
onukristo authored Dec 21, 2023
1 parent b6cb80a commit 23c474e
Show file tree
Hide file tree
Showing 16 changed files with 446 additions and 25 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.27.0] - 2023-12-20

### Changed

- Topic validation can now be done via Kafka Admin, instead of Kafka Producer.
The new logic is under a feature flag, until it gets more battle tested.

## [0.26.0] - 2023-12-14

### Removed
Expand Down
1 change: 1 addition & 0 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ext {
zstdJni : 'com.github.luben:zstd-jni:1.5.2-1',

// versions managed by spring-boot-dependencies platform
caffeine : "com.github.ben-manes.caffeine:caffeine",
commonsLang3 : 'org.apache.commons:commons-lang3',
flywayCore : 'org.flywaydb:flyway-core',
flywayMysql : 'org.flywaydb:flyway-mysql',
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.26.0
version=0.27.0
5 changes: 3 additions & 2 deletions tw-tkms-starter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
compileOnly libraries.javaxValidationApi
compileOnly libraries.jakartaValidationApi

implementation libraries.caffeine
implementation libraries.commonsLang3
implementation libraries.curatorRecipes
implementation libraries.guava
Expand Down Expand Up @@ -121,7 +122,7 @@ shadowJar {
attributes 'Implementation-Version': "$project.version"
}
relocate('com.google.protobuf', 'com.transferwise.kafka.tkms.shadow.com.google.protobuf')

// Minimize does not reduce the jar much (1.9->1.5 MB), so let's not risk/mess with that.
/*
minimize {}
Expand Down Expand Up @@ -206,7 +207,7 @@ publishing {
sign publishing.publications.twTkmsStarter
}
}

repositories {
maven {
url System.getenv("MAVEN_URL")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.transferwise.kafka.tkms;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;

public interface ITkmsTopicValidator {

void preValidateAll();

void validate(TkmsShardPartition tkmsShardPartition, String topic, Integer partition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package com.transferwise.kafka.tkms;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.transferwise.common.baseutils.ExceptionUtils;
import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider;
import com.transferwise.common.baseutils.concurrency.ThreadNamingExecutorServiceWrapper;
import com.transferwise.kafka.tkms.TransactionalKafkaMessageSender.FetchTopicDescriptionRequest;
import com.transferwise.kafka.tkms.TransactionalKafkaMessageSender.FetchTopicDescriptionResponse;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsKafkaAdminProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.TkmsProperties;
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel;
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationType;
import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
public class TkmsTopicValidator implements ITkmsTopicValidator, InitializingBean {

@Autowired
private IExecutorServicesProvider executorServicesProvider;

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private TkmsProperties tkmsProperties;

@Autowired
protected ITkmsKafkaAdminProvider tkmsKafkaAdminProvider;

@Autowired
protected IProblemNotifier problemNotifier;

@Autowired
protected ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;

@Autowired
protected ITkmsMetricsTemplate tkmsMetricsTemplate;

private LoadingCache<FetchTopicDescriptionRequest, FetchTopicDescriptionResponse> topicDescriptionsCache;

private final Map<String, Boolean> topicsValidatedDuringInitializationOrNotified = new ConcurrentHashMap<>();

private ExecutorService executor;

public void afterPropertiesSet() {
this.executor = new ThreadNamingExecutorServiceWrapper("tw-tkms-td-cache", executorServicesProvider.getGlobalExecutorService());
topicDescriptionsCache = Caffeine.newBuilder()
.maximumSize(10_000)
.executor(executor)
.expireAfterWrite(Duration.ofMinutes(5))
.refreshAfterWrite(Duration.ofSeconds(30))
.recordStats()
.build(this::fetchTopicDescription);

CaffeineCacheMetrics.monitor(meterRegistry, topicDescriptionsCache, "tkmsTopicDescriptions");
}

@Override
public void preValidateAll() {
final var topics = tkmsProperties.getTopics();

final var semaphore = new Semaphore(tkmsProperties.getAdminClientTopicsValidationConcurrency());
final var failures = new AtomicInteger();
final var countDownLatch = new CountDownLatch(topics.size());
final var startTimeEpochMs = System.currentTimeMillis();

for (var topic : topics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {
/*
We are validating one by one, to get the proper error messages from Kafka.
And, we are doing it concurrently to speed things up.
*/
executor.execute(() -> {
try {
validate(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), topic, null);

log.info("Topic '{}' successfully pre-validated.", topic);
} catch (Throwable t) {
log.error("Topic validation for '" + topic + "' failed.", t);
failures.incrementAndGet();
} finally {
countDownLatch.countDown();
semaphore.release();
}
});
} else {
break;
}
}

final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;

if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
throw new IllegalStateException("Topic validation is taking too long.");
}

if (failures.get() > 0) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
throw new IllegalStateException("There were failures with topics validations. Refusing to start.");
}
}

@Override
public void validate(TkmsShardPartition shardPartition, String topic, Integer partition) {
if (tkmsProperties.isUseAdminClientForTopicsValidation()) {
validateUsingAdmin(shardPartition, topic, partition);
} else {
validateUsingProducer(topic);
}
}

protected void validateUsingAdmin(TkmsShardPartition shardPartition, String topic, Integer partition) {
topicsValidatedDuringInitializationOrNotified.computeIfAbsent(topic, k -> {
problemNotifier.notify(shardPartition.getShard(), NotificationType.TOPIC_NOT_VALIDATED_AT_INIT, NotificationLevel.WARN, () ->
"Topic '" + topic + "' was not validated during initialization. This can introduce some lag."
+ " Please specify all the topics this service is using in the Tkms property of 'topics'."
);
return Boolean.TRUE;
});

var response = topicDescriptionsCache.get(new FetchTopicDescriptionRequest().setTopic(topic));

if (response == null) {
throw new NullPointerException("Could not fetch topic description for topic '" + topic + "'.");
}

if (response.getThrowable() != null) {
String message;
if (response.getThrowable() instanceof UnknownTopicOrPartitionException) {
message = "Topic '" + topic + "' does not exist.";
} else {
message = "Topic validation for '" + topic + "' failed.";
}
throw new IllegalStateException(message, response.getThrowable());
}

final var topicDescription = response.getTopicDescription();

final var aclOperations = topicDescription.authorizedOperations();
if (aclOperations == null || aclOperations.isEmpty()) {
tkmsMetricsTemplate.registerNoAclOperationsFetched(shardPartition, topic);
} else if (!aclOperations.contains(AclOperation.ALL)
&& !aclOperations.contains(AclOperation.WRITE)
) {
throw new IllegalStateException("The service does not have any ACLs of ALL/WRITE/IDEMPOTENT_WRITE on topic '" + topic + "'."
+ " The ACLs available are '" + StringUtils.join(aclOperations, ",") + "'.");
}

if (partition != null) {
if (topicDescription.partitions().size() < partition - 1) {
throw new IllegalStateException("Kafka partition " + partition + " does not exist for topic '" + topic + "'.");
}
}
}

/*
Legacy logic.
We keep it in, in case some service would run into issues with the admin client based validation.
*/
protected void validateUsingProducer(String topic) {
tkmsKafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic);
}

protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) {
final var topic = request.getTopic();
TopicDescription topicDescription = null;

Throwable throwable = null;

try {
topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin().describeTopics(Collections.singleton(topic),
new DescribeTopicsOptions().includeAuthorizedOperations(true))
.allTopicNames().get(30, TimeUnit.SECONDS).get(topic);
} catch (Throwable t) {
if (t instanceof ExecutionException) {
throwable = t.getCause();
} else {
throwable = t;
}
}

if (throwable != null) {
return new FetchTopicDescriptionResponse().setThrowable(throwable);
}

return new FetchTopicDescriptionResponse().setTopicDescription(topicDescription);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.TkmsProperties;
import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect;
import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel;
Expand All @@ -25,7 +24,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -53,26 +55,22 @@ protected void setTkmsDaoProvider(ITkmsDaoProvider tkmsDaoProvider) {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ITkmsKafkaProducerProvider kafkaProducerProvider;
@Autowired
private IEnvironmentValidator environmentValidator;
@Autowired
private ITransactionsHelper transactionsHelper;
@Autowired
private IProblemNotifier problemNotifier;
@Autowired
private ITkmsTopicValidator tkmsTopicValidator;

private volatile List<ITkmsEventsListener> tkmsEventsListeners;
private RateLimiter errorLogRateLimiter = RateLimiter.create(2);

@Override
public void afterPropertiesSet() {
Assertions.setLevel(properties.getInternals().getAssertionLevel());

environmentValidator.validate();

for (String topic : properties.getTopics()) {
validateTopic(topic);
}
tkmsTopicValidator.preValidateAll();

validateDeleteBatchSizes();

Expand Down Expand Up @@ -168,7 +166,9 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) {

var topic = tkmsMessage.getTopic();
if (!validatedTopics.contains(topic)) {
validateTopic(topic);
tkmsTopicValidator.validate(shardPartition, topic, tkmsMessage.getPartition());

// TODO: Remove when we leave only admin client based validations in.
validatedTopics.add(topic);
}

Expand Down Expand Up @@ -264,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) {
validateMessageSize(message, 0);

var topic = message.getTopic();
validateTopic(topic);
tkmsTopicValidator.validate(shardPartition, topic, message.getPartition());

if (deferMessageRegistrationUntilCommit) {
// Transaction is guaranteed to be active here.
Expand Down Expand Up @@ -371,13 +371,6 @@ public void afterCompletion(int status) {
}
}

/**
* Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast.
*/
protected void validateTopic(String topic) {
kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic);
}

protected void validateMessages(SendMessagesRequest request) {
for (int i = 0; i < request.getTkmsMessages().size(); i++) {
var tkmsMessage = request.getTkmsMessages().get(i);
Expand Down Expand Up @@ -522,4 +515,20 @@ protected int getPartition(int shard, TkmsMessage message) {
return ThreadLocalRandom.current().nextInt(tablesCount);
}


@Data
@Accessors(chain = true)
protected static class FetchTopicDescriptionRequest {

private String topic;
}

@Data
@Accessors(chain = true)
protected static class FetchTopicDescriptionResponse {

private Throwable throwable;

private TopicDescription topicDescription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.transferwise.kafka.tkms.config;

import org.apache.kafka.clients.admin.Admin;

public interface ITkmsKafkaAdminProvider {

Admin getKafkaAdmin();

void closeKafkaAdmin();

}
Loading

0 comments on commit 23c474e

Please sign in to comment.