Skip to content

Commit

Permalink
Trying to auto-create topics, when using admin client for topic valid…
Browse files Browse the repository at this point in the history
…ation (#80)

* Trying to auto-create topics, when using admin client for topic validation
  • Loading branch information
onukristo authored Jan 2, 2024
1 parent 23c474e commit cc909ba
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 36 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.28.0] - 2023-12-20

### Changed

- Configuration options related to topic validation.
- Trying to auto-create topics, when using admin client for topic validation.

### Removed

- Unused `debugEnabled` property.

## [0.27.0] - 2023-12-20

### Changed
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.27.0
version=0.28.0
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import lombok.experimental.UtilityClass;

/*
Mainly used to add verbose log to investigate specfic flaky tests.
Mainly used to add verbose log to investigate specific flaky tests.
*/
@UtilityClass
public class Debug {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ public void afterPropertiesSet() {
public void preValidateAll() {
final var topics = tkmsProperties.getTopics();

final var semaphore = new Semaphore(tkmsProperties.getAdminClientTopicsValidationConcurrency());
final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization());
final var failures = new AtomicInteger();
final var countDownLatch = new CountDownLatch(topics.size());
final var startTimeEpochMs = System.currentTimeMillis();

for (var topic : topics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {
/*
We are validating one by one, to get the proper error messages from Kafka.
Expand All @@ -113,7 +114,8 @@ public void preValidateAll() {
}
}

final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;

if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
Expand All @@ -128,7 +130,7 @@ public void preValidateAll() {

@Override
public void validate(TkmsShardPartition shardPartition, String topic, Integer partition) {
if (tkmsProperties.isUseAdminClientForTopicsValidation()) {
if (tkmsProperties.getTopicValidation().isUseAdminClient()) {
validateUsingAdmin(shardPartition, topic, partition);
} else {
validateUsingProducer(topic);
Expand Down Expand Up @@ -188,6 +190,29 @@ protected void validateUsingProducer(String topic) {
}

protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) {
final var result = fetchTopicDescription0(request);

if (result.getThrowable() != null
&& result.getThrowable() instanceof UnknownTopicOrPartitionException
&& tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) {
final var topic = request.getTopic();
try {
validateUsingProducer(topic);

log.info("Succeeded in auto creating topic `{}`", topic);

return fetchTopicDescription0(request);
} catch (Throwable t) {
log.warn("Trying to auto create topic `{}` failed.", topic, t);
// Close the producer, so it would not spam the metadata fetch failures forever.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
}
}

return result;
}

protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescriptionRequest request) {
final var topic = request.getTopic();
TopicDescription topicDescription = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ public void afterPropertiesSet() {
*/
private Map<NotificationType, NotificationLevel> notificationLevels = new HashMap<>();

/**
* Provides more metrics at performance penalty.
*/
private boolean debugEnabled;

/**
* The default number of partitions in a shard.
*
Expand Down Expand Up @@ -206,21 +201,9 @@ public void afterPropertiesSet() {
@LegacyResolvedValue
private List<String> topics = new ArrayList<>();

/**
* Uses AdminClient to validate topics.
*
* <p>AdminClient allows us to also check if topics have suitable ACLs.
*
* <p>Experimental option.
*
* <p>May be the default in the future.
*/
private boolean useAdminClientForTopicsValidation = false;

/**
* How many topics validations are we doing in parallel, during the initialization of Tkms.
*/
private int adminClientTopicsValidationConcurrency = 10;
@Valid
@jakarta.validation.Valid
private TopicValidation topicValidation = new TopicValidation();

@Valid
@jakarta.validation.Valid
Expand Down Expand Up @@ -549,10 +532,40 @@ public static class Internals {
*/
private Duration flushInterruptionDuration = Duration.ofSeconds(30);

}

@Data
@Accessors(chain = true)
public static class TopicValidation {

/**
* How long do we wait for topics to get pre-validated.
*/
private Duration topicPreValidationTimeout = Duration.ofMinutes(1);

/**
* Uses AdminClient to validate topics.
*
* <p>AdminClient allows us to also check if topics have suitable ACLs.
*
* <p>Experimental option.
*
* <p>May be the default in the future.
*/
private boolean useAdminClient = false;

/**
* How many topics validations are we doing in parallel, during the initialization of Tkms.
*/
private int validationConcurrencyAtInitialization = 10;

/**
* Tries to auto create topic using the producer, when admin client based topic validation fails.
*
* <p>This would allow us still to use the capable admin client based validation in environments
* where topics are expected to be auto created.
*/
private boolean tryToAutoCreateTopics = true;
}

public enum NotificationLevel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void cleanup() {
((TransactionalKafkaMessageSender) transactionalKafkaMessageSender).setTkmsDaoProvider(tkmsDaoProvider);
tkmsProperties.setDeferMessageRegistrationUntilCommit(false);
tkmsProperties.setValidateSerialization(false);
tkmsProperties.setUseAdminClientForTopicsValidation(false);
tkmsProperties.getTopicValidation().setUseAdminClient(false);
}

protected void setupConfig(boolean deferUntilCommit) {
Expand Down Expand Up @@ -484,7 +484,7 @@ private static Stream<Arguments> unknownTopicsMatrix() {
void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit, boolean useAdminClient) {
try {
setupConfig(deferUntilCommit);
tkmsProperties.setUseAdminClientForTopicsValidation(useAdminClient);
tkmsProperties.getTopicValidation().setUseAdminClient(useAdminClient);

var expectedMessage =
useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ private static Stream<Arguments> compressionInput() {
var arguments = new ArrayList<Arguments>();

for (var deferUntilCommit : deferUntilCommits) {
arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 111, 110, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1171, 1171, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 134, 134, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 160, 160, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 158, 158, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 100, 100, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 112, 111, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1172, 1171, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 135, 134, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 164, 160, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 162, 158, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 101, 100, deferUntilCommit));
}

return arguments.stream();
Expand Down
5 changes: 3 additions & 2 deletions tw-tkms-starter/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ tw-tkms:
ms: 7
earliest-visible-messages:
enabled: true
debug-enabled: true
compression:
algorithm: random
min-size: 10
Expand Down Expand Up @@ -80,8 +79,10 @@ spring:
tw-tkms:
database-dialect: POSTGRES
delete-batch-sizes: "51, 11, 5, 1"
use-admin-client-for-topics-validation: true
topic-validation:
use-admin-client: true

# We will not pre-create the topic in order to test the
tw-tkms-test:
test-topic: TestTopicPostgres

Expand Down

0 comments on commit cc909ba

Please sign in to comment.