Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to auto-create topics, when using admin client for topic validation #80

Merged
merged 3 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.28.0] - 2023-12-20

### Changed

- Configuration options related to topic validation.
- Trying to auto-create topics, when using admin client for topic validation.

### Removed

- Unused `debugEnabled` property.

## [0.27.0] - 2023-12-20

### Changed
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.27.0
version=0.28.0
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import lombok.experimental.UtilityClass;

/*
Mainly used to add verbose log to investigate specfic flaky tests.
Mainly used to add verbose log to investigate specific flaky tests.
*/
@UtilityClass
public class Debug {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,15 @@ public void afterPropertiesSet() {
public void preValidateAll() {
final var topics = tkmsProperties.getTopics();

final var semaphore = new Semaphore(tkmsProperties.getAdminClientTopicsValidationConcurrency());
final var semaphore = new Semaphore(tkmsProperties.getTopicValidation().getValidationConcurrencyAtInitialization());
final var failures = new AtomicInteger();
final var countDownLatch = new CountDownLatch(topics.size());
final var startTimeEpochMs = System.currentTimeMillis();

for (var topic : topics) {
topicsValidatedDuringInitializationOrNotified.put(topic, Boolean.TRUE);
final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
if (ExceptionUtils.doUnchecked(() -> semaphore.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS))) {
/*
We are validating one by one, to get the proper error messages from Kafka.
Expand All @@ -113,7 +114,8 @@ public void preValidateAll() {
}
}

final var timeoutMs = tkmsProperties.getInternals().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;
final var timeoutMs =
tkmsProperties.getTopicValidation().getTopicPreValidationTimeout().toMillis() - System.currentTimeMillis() + startTimeEpochMs;

if (!ExceptionUtils.doUnchecked(() -> countDownLatch.await(timeoutMs, TimeUnit.MILLISECONDS))) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
Expand All @@ -128,7 +130,7 @@ public void preValidateAll() {

@Override
public void validate(TkmsShardPartition shardPartition, String topic, Integer partition) {
if (tkmsProperties.isUseAdminClientForTopicsValidation()) {
if (tkmsProperties.getTopicValidation().isUseAdminClient()) {
validateUsingAdmin(shardPartition, topic, partition);
} else {
validateUsingProducer(topic);
Expand Down Expand Up @@ -188,6 +190,29 @@ protected void validateUsingProducer(String topic) {
}

protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescriptionRequest request) {
final var result = fetchTopicDescription0(request);

if (result.getThrowable() != null
&& result.getThrowable() instanceof UnknownTopicOrPartitionException
&& tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) {
final var topic = request.getTopic();
try {
validateUsingProducer(topic);

log.info("Succeeded in auto creating topic `{}`", topic);

return fetchTopicDescription0(request);
} catch (Throwable t) {
log.warn("Trying to auto create topic `{}` failed.", topic, t);
// Close the producer, so it would not spam the metadata fetch failures forever.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
}
}

return result;
}

protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescriptionRequest request) {
final var topic = request.getTopic();
TopicDescription topicDescription = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ public void afterPropertiesSet() {
*/
private Map<NotificationType, NotificationLevel> notificationLevels = new HashMap<>();

/**
* Provides more metrics at performance penalty.
*/
private boolean debugEnabled;

/**
* The default number of partitions in a shard.
*
Expand Down Expand Up @@ -206,21 +201,9 @@ public void afterPropertiesSet() {
@LegacyResolvedValue
private List<String> topics = new ArrayList<>();

/**
* Uses AdminClient to validate topics.
*
* <p>AdminClient allows us to also check if topics have suitable ACLs.
*
* <p>Experimental option.
*
* <p>May be the default in the future.
*/
private boolean useAdminClientForTopicsValidation = false;

/**
* How many topics validations are we doing in parallel, during the initialization of Tkms.
*/
private int adminClientTopicsValidationConcurrency = 10;
@Valid
@jakarta.validation.Valid
private TopicValidation topicValidation = new TopicValidation();

@Valid
@jakarta.validation.Valid
Expand Down Expand Up @@ -549,10 +532,40 @@ public static class Internals {
*/
private Duration flushInterruptionDuration = Duration.ofSeconds(30);

}

@Data
@Accessors(chain = true)
public static class TopicValidation {

/**
* How long do we wait for topics to get pre-validated.
*/
private Duration topicPreValidationTimeout = Duration.ofMinutes(1);

/**
* Uses AdminClient to validate topics.
*
* <p>AdminClient allows us to also check if topics have suitable ACLs.
*
* <p>Experimental option.
*
* <p>May be the default in the future.
*/
private boolean useAdminClient = false;

/**
* How many topics validations are we doing in parallel, during the initialization of Tkms.
*/
private int validationConcurrencyAtInitialization = 10;

/**
* Tries to auto create topic using the producer, when admin client based topic validation fails.
*
* <p>This would allow us still to use the capable admin client based validation in environments
* where topics are expected to be auto created.
*/
private boolean tryToAutoCreateTopics = true;
}

public enum NotificationLevel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void cleanup() {
((TransactionalKafkaMessageSender) transactionalKafkaMessageSender).setTkmsDaoProvider(tkmsDaoProvider);
tkmsProperties.setDeferMessageRegistrationUntilCommit(false);
tkmsProperties.setValidateSerialization(false);
tkmsProperties.setUseAdminClientForTopicsValidation(false);
tkmsProperties.getTopicValidation().setUseAdminClient(false);
}

protected void setupConfig(boolean deferUntilCommit) {
Expand Down Expand Up @@ -484,7 +484,7 @@ private static Stream<Arguments> unknownTopicsMatrix() {
void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit, boolean useAdminClient) {
try {
setupConfig(deferUntilCommit);
tkmsProperties.setUseAdminClientForTopicsValidation(useAdminClient);
tkmsProperties.getTopicValidation().setUseAdminClient(useAdminClient);

var expectedMessage =
useAdminClient ? "Topic 'NotExistingTopic' does not exist." : "Topic NotExistingTopic not present in metadata after";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ private static Stream<Arguments> compressionInput() {
var arguments = new ArrayList<Arguments>();

for (var deferUntilCommit : deferUntilCommits) {
arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 111, 110, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1171, 1171, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 134, 134, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 160, 160, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 158, 158, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 100, 100, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.GZIP, 112, 111, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.NONE, 1172, 1171, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.LZ4, 135, 134, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY, 164, 160, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.SNAPPY_FRAMED, 162, 158, deferUntilCommit));
arguments.add(Arguments.of(CompressionAlgorithm.ZSTD, 101, 100, deferUntilCommit));
}

return arguments.stream();
Expand Down
5 changes: 3 additions & 2 deletions tw-tkms-starter/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ tw-tkms:
ms: 7
earliest-visible-messages:
enabled: true
debug-enabled: true
compression:
algorithm: random
min-size: 10
Expand Down Expand Up @@ -80,8 +79,10 @@ spring:
tw-tkms:
database-dialect: POSTGRES
delete-batch-sizes: "51, 11, 5, 1"
use-admin-client-for-topics-validation: true
topic-validation:
use-admin-client: true

# We will not pre-create the topic in order to test the
tw-tkms-test:
test-topic: TestTopicPostgres

Expand Down
Loading