diff --git a/demoapp/src/main/resources/application.yml b/demoapp/src/main/resources/application.yml index 55dc64d..f587be3 100644 --- a/demoapp/src/main/resources/application.yml +++ b/demoapp/src/main/resources/application.yml @@ -28,7 +28,6 @@ spring: locations: "classpath:db/migration/mysql" tw-tkms: polling-interval: 5ms - shards-count: 1 partitions-count: 1 topics: - ComplexTest diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java index ce7cea8..5f9bde9 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsTopicValidator.java @@ -142,12 +142,12 @@ public void preValidateAll() { try { phaser.awaitAdvanceInterruptibly(phase, timeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { - tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation(); throw new IllegalStateException("Topic validation is taking too long."); } if (failures.get() > 0) { - tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation(); throw new IllegalStateException("There were failures with topics validations. Refusing to start."); } } @@ -223,8 +223,10 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript && result.getThrowable() instanceof UnknownTopicOrPartitionException && tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) { final var topic = request.getTopic(); + final var shardPartition = request.getShardPartition(); + try { - validateUsingProducer(request.getShardPartition(), topic); + validateUsingProducer(shardPartition, topic); log.info("Succeeded in auto creating topic `{}`", topic); @@ -232,7 +234,7 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript } catch (Throwable t) { log.warn("Trying to auto create topic `{}` failed.", topic, t); // Close the producer, so it would not spam the metadata fetch failures forever. - tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(shardPartition); } } @@ -246,9 +248,8 @@ protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescrip Throwable throwable = null; try { - topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin().describeTopics(Collections.singleton(topic), - new DescribeTopicsOptions().includeAuthorizedOperations(true)) - .allTopicNames().get(30, TimeUnit.SECONDS).get(topic); + topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin(request.getShardPartition()).describeTopics(Collections.singleton(topic), + new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames().get(30, TimeUnit.SECONDS).get(topic); } catch (Throwable t) { if (t instanceof ExecutionException) { throwable = t.getCause(); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 131351a..b681507 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -519,6 +519,7 @@ protected int getPartition(int shard, TkmsMessage message) { @Data @Accessors(chain = true) protected static class FetchTopicDescriptionRequest { + private TkmsShardPartition shardPartition; private String topic; } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java index e533be4..2b6f925 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaAdminProvider.java @@ -1,11 +1,12 @@ package com.transferwise.kafka.tkms.config; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import org.apache.kafka.clients.admin.Admin; public interface ITkmsKafkaAdminProvider { - Admin getKafkaAdmin(); + Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition); - void closeKafkaAdmin(); + void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition); } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index d98aa53..3473305 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -11,7 +11,9 @@ public interface ITkmsKafkaProducerProvider { void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - void closeKafkaProducerForTopicValidation(); + void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition); + + void closeKafkaProducersForTopicValidation(); enum UseCase { PROXY, diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java index e0f8451..7a1367f 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaAdminProvider.java @@ -1,11 +1,14 @@ package com.transferwise.kafka.tkms.config; import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.TkmsProperties.ShardProperties; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import lombok.Data; @@ -19,6 +22,7 @@ @Slf4j public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, GracefulShutdownStrategy { + private static final Set CONFIG_NAMES = AdminClientConfig.configNames(); /** * Keep the kafka-clients' MBean registration happy. */ @@ -30,11 +34,11 @@ public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, Graceful @Autowired private MeterRegistry meterRegistry; - private Map admins = new ConcurrentHashMap<>(); + private Map admins = new ConcurrentHashMap<>(); @Override - public Admin getKafkaAdmin() { - return admins.computeIfAbsent(0L, key -> { + public Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition) { + return admins.computeIfAbsent(tkmsShardPartition, key -> { var configs = new HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'."); @@ -43,13 +47,21 @@ public Admin getKafkaAdmin() { configs.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100); configs.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000); - var configNames = AdminClientConfig.configNames(); for (var e : tkmsProperties.getKafka().entrySet()) { - if (configNames.contains(e.getKey())) { + if (CONFIG_NAMES.contains(e.getKey())) { configs.put(e.getKey(), e.getValue()); } } + ShardProperties shardProperties = tkmsProperties.getShards().get(tkmsShardPartition.getShard()); + if (shardProperties != null) { + for (var e : shardProperties.getKafka().entrySet()) { + if (CONFIG_NAMES.contains(e.getKey())) { + configs.put(e.getKey(), e.getValue()); + } + } + } + final var admin = KafkaAdminClient.create(configs); final var kafkaClientMetrics = new KafkaClientMetrics(admin); kafkaClientMetrics.bindTo(meterRegistry); @@ -59,8 +71,8 @@ public Admin getKafkaAdmin() { } @Override - public void closeKafkaAdmin() { - var adminEntry = admins.remove(0L); + public void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition) { + var adminEntry = admins.remove(tkmsShardPartition); if (adminEntry == null) { return; @@ -77,7 +89,7 @@ public void closeKafkaAdmin() { @Override public void applicationTerminating() { - closeKafkaAdmin(); + admins.keySet().forEach((this::closeKafkaAdmin)); } @Override diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 7662c56..21c98c4 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -8,6 +8,7 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import lombok.Data; @@ -23,6 +24,8 @@ @Slf4j public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy { + private static final Set CONFIG_NAMES = ProducerConfig.configNames(); + /** * Keep the kafka-clients' MBean registration happy. */ @@ -66,11 +69,19 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa } configs.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "120000"); - configs.putAll(tkmsProperties.getKafka()); + for (var e : tkmsProperties.getKafka().entrySet()) { + if (CONFIG_NAMES.contains(e.getKey())) { + configs.put(e.getKey(), e.getValue()); + } + } ShardProperties shardProperties = tkmsProperties.getShards().get(shardPartition.getShard()); if (shardProperties != null) { - configs.putAll(shardProperties.getKafka()); + for (var e : shardProperties.getKafka().entrySet()) { + if (CONFIG_NAMES.contains(e.getKey())) { + configs.put(e.getKey(), e.getValue()); + } + } } final var producer = new KafkaProducer(configs); @@ -104,8 +115,14 @@ public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCas } @Override - public void closeKafkaProducerForTopicValidation() { - closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + public void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition) { + closeKafkaProducer(tkmsShardPartition, UseCase.TOPIC_VALIDATION); + } + + @Override + public void closeKafkaProducersForTopicValidation() { + producers.keySet().stream().filter(key -> key.getRight() == UseCase.TOPIC_VALIDATION) + .forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight())); } @Override diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java index af5bd4b..3a6796b 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EarliestMessageTrackingIntTest.java @@ -100,7 +100,7 @@ void testIfEarliestMessageTrackerBehavesAsExpected() { earliestMessageTracker.init(); assertThat(earliestMessageTracker.getEarliestMessageId()).isEqualTo(committedValue); - + assertThat(pollingAllRecordsHappened); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 36c559c..5747db8 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -494,7 +494,7 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean .hasMessageContaining(expectedMessage); } finally { // Stop logs spam about not existing topic in metadata. - tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation(); } } @@ -702,7 +702,7 @@ void testThatSendingLargeMessagesWillNotCauseAnIssue(boolean deferUntilCommit) { if (receivedValue.equals(message.getValue())) { receivedCount.incrementAndGet(); } else { - throw new IllegalStateException("Wrong message received."); + throw new IllegalStateException("Wrong message received: " + receivedValue); } }); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaServerMetricsIntTest.java similarity index 96% rename from tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java rename to tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaServerMetricsIntTest.java index 952fa63..fe2f9a0 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaServerMetricsIntTest.java @@ -12,7 +12,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -class KafkaMetricsIntTest extends BaseIntTest { +class KafkaServerMetricsIntTest extends BaseIntTest { @Autowired diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java index fcf7a0c..29b2214 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MessagesInterceptionTest.java @@ -11,7 +11,6 @@ import com.transferwise.kafka.tkms.test.TestMessagesInterceptor; import com.transferwise.kafka.tkms.test.TestProperties; import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java new file mode 100644 index 0000000..0773705 --- /dev/null +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/MultiServerTopicValidationIntTest.java @@ -0,0 +1,78 @@ +package com.transferwise.kafka.tkms; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; +import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; +import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.test.BaseIntTest; +import java.nio.charset.StandardCharsets; +import java.util.stream.Stream; +import lombok.SneakyThrows; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.beans.factory.annotation.Autowired; + +public class MultiServerTopicValidationIntTest extends BaseIntTest { + + @Autowired + private ITransactionalKafkaMessageSender transactionalKafkaMessageSender; + @Autowired + private ITransactionsHelper transactionsHelper; + @Autowired + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; + + @AfterEach + public void cleanup() { + super.cleanup(); + + tkmsProperties.getTopicValidation().setUseAdminClient(false); + tkmsProperties.getTopicValidation().setTryToAutoCreateTopics(true); + } + + private static Stream unknownTopicsMatrix() { + return Stream.of( + Arguments.of(true), + Arguments.of(false) + ); + } + + @ParameterizedTest + @MethodSource("unknownTopicsMatrix") + @SneakyThrows + void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean useAdminClient) { + tkmsProperties.getTopicValidation().setUseAdminClient(useAdminClient); + + final var notExistingTopic = "NotExistingTopic"; + final var existingTopicInKafka2 = "TestTopicInAnotherServer"; + + assertThatThrownBy(() -> sendMessage(notExistingTopic, null)).hasMessageContaining(notExistingTopic); + + sendMessage("TestTopic", null); + sendMessage("TestTopic", tkmsProperties.getDefaultShard()); + + assertThatThrownBy(() -> sendMessage(existingTopicInKafka2, null)).hasMessageContaining(existingTopicInKafka2); + + sendMessage(existingTopicInKafka2, 2); + } + + protected void sendMessage(String topic, Integer shard) { + try { + transactionsHelper.withTransaction().run(() -> { + var message = new TkmsMessage().setTopic(topic).setValue("Stuff".getBytes(StandardCharsets.UTF_8)); + if (shard != null) { + message.setShard(shard); + } + transactionalKafkaMessageSender.sendMessage(message); + }); + } catch (Exception e) { + throw new RuntimeException("Sending message to topic '" + topic + "' and shard " + shard + " failed."); + } finally { + // Stop spam of metadata errors. + tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation(); + } + } +} diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTestServer.java similarity index 95% rename from tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTest.java rename to tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTestServer.java index 0580ee9..3402ed3 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSenderTestServer.java @@ -6,7 +6,7 @@ import java.util.List; import org.junit.jupiter.api.Test; -class TransactionalKafkaMessageSenderTest { +class TransactionalKafkaMessageSenderTestServer { @Test void deleteBatchSizesAreCorrectlyValidated() { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java similarity index 94% rename from tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java rename to tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java index 514e4b2..cdbf3b2 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTestServer.java @@ -11,7 +11,7 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -class TkmsKafkaProducerProviderTest extends BaseIntTest { +class TkmsKafkaProducerProviderTestServer extends BaseIntTest { @Autowired private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/FaultInjectedTkmsDao.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/FaultInjectedTkmsDao.java index af2a2b0..4d4bffa 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/FaultInjectedTkmsDao.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/FaultInjectedTkmsDao.java @@ -27,7 +27,7 @@ public InsertMessageResult insertMessage(TkmsShardPartition shardPartition, Tkms throw new IllegalStateException("Haha, inserts are failing lol."); } } - + return delegate.insertMessage(shardPartition, message); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/TkmsDaoIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/TkmsDaoIntTest.java index e7c0866..5abfaa8 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/TkmsDaoIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/dao/TkmsDaoIntTest.java @@ -48,7 +48,7 @@ void tearDownClass() { @ProductionBug("Delete worked, but batches were combined wrongly.") void deletingInBatchesWorks() { var tkmsDao = tkmsDaoProvider.getTkmsDao(0); - + List records = new ArrayList<>(); for (int i = 0; i < 1001; i++) { records.add( diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java index 52ca713..c3b80f2 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java @@ -61,7 +61,6 @@ public void setup() { } meterCache.clear(); - TkmsClockHolder.reset(); } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/KafkaConfiguration.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/KafkaConfiguration.java index 3d29858..8c93059 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/KafkaConfiguration.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/KafkaConfiguration.java @@ -1,11 +1,15 @@ package com.transferwise.kafka.tkms.test; import com.transferwise.common.baseutils.ExceptionUtils; +import com.transferwise.kafka.tkms.test.TestProperties.KafkaServer; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.NewTopic; @@ -28,14 +32,25 @@ public class KafkaConfiguration implements InitializingBean { @SuppressFBWarnings("RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE") @Override public void afterPropertiesSet() { - try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { - deleteTopic(adminClient, tkmsProperties.getTestTopic()); - } - try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) { - createTopic(adminClient, tkmsProperties.getTestTopic(), 10); + for (var kafkaServer : tkmsProperties.getKafkaServers()) { + var props = getKafkaProperties(kafkaServer); + for (var testTopic : kafkaServer.getTopics()) { + try (AdminClient adminClient = AdminClient.create(props)) { + deleteTopic(adminClient, testTopic); + } + try (AdminClient adminClient = AdminClient.create(props)) { + createTopic(adminClient, testTopic, 10); + } + } } } + protected Map getKafkaProperties(KafkaServer kafkaServer) { + var props = new HashMap<>(kafkaAdmin.getConfigurationProperties()); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getBootstrapServers()); + return props; + } + protected void deleteTopic(AdminClient adminClient, String topicName) { try { final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName)); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestProperties.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestProperties.java index cefe9de..04afaf7 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestProperties.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/TestProperties.java @@ -1,5 +1,7 @@ package com.transferwise.kafka.tkms.test; +import java.util.ArrayList; +import java.util.List; import lombok.Data; import lombok.experimental.Accessors; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -12,4 +14,14 @@ public class TestProperties { private String testTopic = "MyTestTopic"; + + private List kafkaServers = new ArrayList<>(); + + @Data + public static class KafkaServer { + + private String bootstrapServers; + + private List topics = new ArrayList<>(); + } } diff --git a/tw-tkms-starter/src/test/resources/application.yml b/tw-tkms-starter/src/test/resources/application.yml index ab2b794..69cc451 100644 --- a/tw-tkms-starter/src/test/resources/application.yml +++ b/tw-tkms-starter/src/test/resources/application.yml @@ -12,7 +12,7 @@ spring: transaction-isolation: 2 kafka: - bootstrap-servers: "${TW_TKMS_KAFKA_TCP_HOST:localhost}:${TW_TKMS_KAFKA_TCP_9092}" + bootstrap-servers: "${TW_TKMS_KAFKA_1_TCP_HOST:localhost}:${TW_TKMS_KAFKA_1_TCP_9092}" consumer: group-id: ${spring.application.name} value-deserializer: org.apache.kafka.common.serialization.StringDeserializer @@ -29,12 +29,12 @@ tw-curator: tw-tkms: polling-interval: 5ms - shards-count: 2 + shards-count: 3 insert-batch-size: 2 topics: - TestTopic kafka: - bootstrap.servers: "${TW_TKMS_KAFKA_TCP_HOST:localhost}:${TW_TKMS_KAFKA_TCP_9092}" + bootstrap.servers: "${TW_TKMS_KAFKA_1_TCP_HOST:localhost}:${TW_TKMS_KAFKA_1_TCP_9092}" shards: 1: polling-interval: 6ms @@ -45,6 +45,9 @@ tw-tkms: enabled: true topics: - TestTopic + 2: + kafka: + bootstrap.servers: "${TW_TKMS_KAFKA_2_TCP_HOST:localhost}:${TW_TKMS_KAFKA_2_TCP_9092}" compression: algorithm: random min-size: 10 @@ -54,6 +57,8 @@ tw-tkms: start-delay: 0s internals: assertion-level: 1 + topic-validation: + try-to-auto-create-topics: false tw-graceful-shutdown: clients-reaction-time-ms: 1000 @@ -64,6 +69,13 @@ logging.level: tw-tkms-test: test-topic: TestTopic + kafka-servers: + - bootstrap-servers: "${TW_TKMS_KAFKA_1_TCP_HOST:localhost}:${TW_TKMS_KAFKA_1_TCP_9092}" + topics: + - TestTopic + - bootstrap-servers: "${TW_TKMS_KAFKA_2_TCP_HOST:localhost}:${TW_TKMS_KAFKA_2_TCP_9092}" + topics: + - TestTopicInAnotherServer --- spring: @@ -89,6 +101,13 @@ tw-tkms: # We will not pre-create the topic in order to test the tw-tkms-test: test-topic: TestTopicPostgres + kafka-servers: + - bootstrap-servers: "${TW_TKMS_KAFKA_1_TCP_HOST:localhost}:${TW_TKMS_KAFKA_1_TCP_9092}" + topics: + - TestTopicPostgres + - bootstrap-servers: "${TW_TKMS_KAFKA_2_TCP_HOST:localhost}:${TW_TKMS_KAFKA_2_TCP_9092}" + topics: + - TestTopicPostgresAnotherServer --- @@ -112,7 +131,6 @@ tw-tkms: enabled: true look-back-period: 10s table-name: earliestmessage.tw_tkms_earliest_visible_messages - shards-count: 2 table-base-name: earliestmessage.outgoing_message monitoring: left-over-messages-check-start-delay: 1s @@ -122,4 +140,8 @@ tw-tkms: table-name: earliestmessage.tw_tkms_earliest_visible_messages tw-tkms-test: - test-topic: TestTopicEarliestMessage \ No newline at end of file + test-topic: TestTopicEarliestMessage + kafka-servers: + - bootstrap-servers: "${TW_TKMS_KAFKA_1_TCP_HOST:localhost}:${TW_TKMS_KAFKA_1_TCP_9092}" + topics: + - TestTopicEarliestMessage \ No newline at end of file diff --git a/tw-tkms-starter/src/test/resources/docker-compose.yml b/tw-tkms-starter/src/test/resources/docker-compose.yml index 8895407..23021f7 100644 --- a/tw-tkms-starter/src/test/resources/docker-compose.yml +++ b/tw-tkms-starter/src/test/resources/docker-compose.yml @@ -8,16 +8,41 @@ services: environment: ALLOW_ANONYMOUS_LOGIN: "yes" JVMFLAGS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent - kafka: + kafka1: image: wurstmeister/kafka:2.12-2.4.1 ports: - "9092" - container_name: "tw_tkms_kafka" + container_name: "tw_tkms_kafka_1" environment: - PORT_COMMAND: "docker port $$(docker ps -q -f name=tw_tkms_kafka) 9092/tcp | cut -d: -f2" + PORT_COMMAND: "docker port $$(docker ps -q -f name=tw_tkms_kafka_1) 9092/tcp | cut -d: -f2" KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:_{PORT_COMMAND},INTERNAL://kafka:9093" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka1 + KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:_{PORT_COMMAND},INTERNAL://kafka1:9093" + KAFKA_LISTENERS: EXTERNAL://:9092,INTERNAL://:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_MESSAGE_MAX_BYTES: 10485760 + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 20000 + KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" + KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 5 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + LOG4J_LOGGER_ORG: WARN,STDOUT + LOG4J_LOGGER_ORG_APACHE_KAFKA: WARN,STDOUT + LOG4J_LOGGER_KAFKA: WARN,STDOUT + KAFKA_JVM_PERFORMANCE_OPTS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent + volumes: + - /var/run/docker.sock:/var/run/docker.sock + kafka2: + image: wurstmeister/kafka:2.12-2.4.1 + ports: + - "9092" + container_name: "tw_tkms_kafka_2" + environment: + PORT_COMMAND: "docker port $$(docker ps -q -f name=tw_tkms_kafka_2) 9092/tcp | cut -d: -f2" + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka2 + KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:_{PORT_COMMAND},INTERNAL://kafka2:9093" KAFKA_LISTENERS: EXTERNAL://:9092,INTERNAL://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL