diff --git a/CHANGELOG.md b/CHANGELOG.md index 6861dec..c92c916 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.28.0] - 2023-12-20 + +### Changed + +- Configuration options related to topic validation. +- Trying to auto-create topics, when using admin client for topic validation. + +### Removed + +- Unused `debugEnabled` property. + ## [0.27.0] - 2023-12-20 ### Changed diff --git a/gradle.properties b/gradle.properties index e80baea..0f84cfd 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.27.0 +version=0.28.0 diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/Debug.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/Debug.java index 4909b22..caa6f1d 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/Debug.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/Debug.java @@ -5,7 +5,7 @@ import lombok.experimental.UtilityClass; /* - Mainly used to add verbose log to investigate specfic flaky tests. + Mainly used to add verbose log to investigate specific flaky tests. */ @UtilityClass public class Debug { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java index c94d1f2..af0442a 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java @@ -82,14 +82,15 @@ public void afterPropertiesSet() { public void preValidateAll() { final var topics = tkmsProperties.getTopics(); - final var semaphore = new Semaphore(tkmsProperties.getAdminClientTopicsValidationConcurrency()); + final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization()); final var failures = new AtomicInteger(); final var countDownLatch = new CountDownLatch(topics.size()); final var startTimeEpochMs = System.currentTimeMillis(); for (var topic : topics) { topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE); - final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; + final var timeoutMs = + tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) { /* We are validating one by one, to get the proper error messages from Kafka. @@ -113,7 +114,8 @@ public void preValidateAll() { } } - final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; + final var timeoutMs = + tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs; if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) { tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); @@ -128,7 +130,7 @@ public void preValidateAll() { @Override public void validate(TkmsShardPartition shardPartition, String topic, Integer partition) { - if (tkmsProperties.isUseAdminClientForTopicsValidation()) { + if (tkmsProperties.getTopicValidation().isUseAdminClient()) { validateUsingAdmin(shardPartition, topic, partition); } else { validateUsingProducer(topic); @@ -188,6 +190,29 @@ protected void validateUsingProducer(String topic) { } protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) { + final var result = fetchTopicDescription0(request); + + if (result.getThrowable() != null + && result.getThrowable() instanceof UnknownTopicOrPartitionException + && tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) { + final var topic = request.getTopic(); + try { + validateUsingProducer(topic); + + log.info("Succeeded in auto creating topic `{}`", topic); + + return fetchTopicDescription0(request); + } catch (Throwable t) { + log.warn("Trying to auto create topic `{}` failed.", topic, t); + // Close the producer, so it would not spam the metadata fetch failures forever. + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + } + } + + return result; + } + + protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescriptionRequest request) { final var topic = request.getTopic(); TopicDescription topicDescription = null; diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index 7d3ba7e..65cf8fc 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -36,11 +36,6 @@ public void afterPropertiesSet() { */ private Map notificationLevels = new HashMap<>(); - /** - * Provides more metrics at performance penalty. - */ - private boolean debugEnabled; - /** * The default number of partitions in a shard. * @@ -206,21 +201,9 @@ public void afterPropertiesSet() { @LegacyResolvedValue private List topics = new ArrayList<>(); - /** - * Uses AdminClient to validate topics. - * - *

AdminClient allows us to also check if topics have suitable ACLs. - * - *

Experimental option. - * - *

May be the default in the future. - */ - private boolean useAdminClientForTopicsValidation = false; - - /** - * How many topics validations are we doing in parallel, during the initialization of Tkms. - */ - private int adminClientTopicsValidationConcurrency = 10; + @Valid + @jakarta.validation.Valid + private TopicValidation topicValidation = new TopicValidation(); @Valid @jakarta.validation.Valid @@ -549,10 +532,40 @@ public static class Internals { */ private Duration flushInterruptionDuration = Duration.ofSeconds(30); + } + + @Data + @Accessors(chain = true) + public static class TopicValidation { + /** * How long do we wait for topics to get pre-validated. */ private Duration topicPreValidationTimeout = Duration.ofMinutes(1); + + /** + * Uses AdminClient to validate topics. + * + *

AdminClient allows us to also check if topics have suitable ACLs. + * + *

Experimental option. + * + *

May be the default in the future. + */ + private boolean useAdminClient = false; + + /** + * How many topics validations are we doing in parallel, during the initialization of Tkms. + */ + private int validationConcurrencyAtInitialization = 10; + + /** + * Tries to auto create topic using the producer, when admin client based topic validation fails. + * + *

This would allow us still to use the capable admin client based validation in environments + * where topics are expected to be auto created. + */ + private boolean tryToAutoCreateTopics = true; } public enum NotificationLevel { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 7f974c7..36c559c 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -88,7 +88,7 @@ public void cleanup() { ((TransactionalKafkaMessageSender) transactionalKafkaMessageSender).setTkmsDaoProvider(tkmsDaoProvider); tkmsProperties.setDeferMessageRegistrationUntilCommit(false); tkmsProperties.setValidateSerialization(false); - tkmsProperties.setUseAdminClientForTopicsValidation(false); + tkmsProperties.getTopicValidation().setUseAdminClient(false); } protected void setupConfig(boolean deferUntilCommit) { @@ -484,7 +484,7 @@ private static Stream unknownTopicsMatrix() { void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit, boolean useAdminClient) { try { setupConfig(deferUntilCommit); - tkmsProperties.setUseAdminClientForTopicsValidation(useAdminClient); + tkmsProperties.getTopicValidation().setUseAdminClient(useAdminClient); var expectedMessage = useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after"; diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java index db80201..821f952 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/PostgresEndToEndIntTest.java @@ -16,12 +16,12 @@ private static Stream compressionInput() { var arguments = new ArrayList(); for (var deferUntilCommit : deferUntilCommits) { - arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 111, 110, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1171, 1171, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 134, 134, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 160, 160, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 158, 158, deferUntilCommit)); - arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 100, 100, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 112, 111, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1172, 1171, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 135, 134, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 164, 160, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 162, 158, deferUntilCommit)); + arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 101, 100, deferUntilCommit)); } return arguments.stream(); diff --git a/tw-tkms-starter/src/test/resources/application.yml b/tw-tkms-starter/src/test/resources/application.yml index eb25838..1cc2ad3 100644 --- a/tw-tkms-starter/src/test/resources/application.yml +++ b/tw-tkms-starter/src/test/resources/application.yml @@ -43,7 +43,6 @@ tw-tkms: ms: 7 earliest-visible-messages: enabled: true - debug-enabled: true compression: algorithm: random min-size: 10 @@ -80,8 +79,10 @@ spring: tw-tkms: database-dialect: POSTGRES delete-batch-sizes: "51, 11, 5, 1" - use-admin-client-for-topics-validation: true + topic-validation: + use-admin-client: true +# We will not pre-create the topic in order to test the tw-tkms-test: test-topic: TestTopicPostgres