Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo committed Jan 15, 2024
1 parent e466798 commit 2d834c4
Show file tree
Hide file tree
Showing 21 changed files with 231 additions and 48 deletions.
1 change: 0 additions & 1 deletion demoapp/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ spring:
locations: "classpath:db/migration/mysql"
tw-tkms:
polling-interval: 5ms
shards-count: 1
partitions-count: 1
topics:
- ComplexTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ public void preValidateAll() {
try {
phaser.awaitAdvanceInterruptibly(phase, timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
throw new IllegalStateException("Topic validation is taking too long.");
}

if (failures.get() > 0) {
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
throw new IllegalStateException("There were failures with topics validations. Refusing to start.");
}
}
Expand Down Expand Up @@ -223,16 +223,18 @@ protected FetchTopicDescriptionResponse fetchTopicDescription(FetchTopicDescript
&& result.getThrowable() instanceof UnknownTopicOrPartitionException
&& tkmsProperties.getTopicValidation().isTryToAutoCreateTopics()) {
final var topic = request.getTopic();
final var shardPartition = request.getShardPartition();

try {
validateUsingProducer(request.getShardPartition(), topic);
validateUsingProducer(shardPartition, topic);

log.info("Succeeded in auto creating topic `{}`", topic);

return fetchTopicDescription0(request);
} catch (Throwable t) {
log.warn("Trying to auto create topic `{}` failed.", topic, t);
// Close the producer, so it would not spam the metadata fetch failures forever.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(shardPartition);
}
}

Expand All @@ -246,9 +248,8 @@ protected FetchTopicDescriptionResponse fetchTopicDescription0(FetchTopicDescrip
Throwable throwable = null;

try {
topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin().describeTopics(Collections.singleton(topic),
new DescribeTopicsOptions().includeAuthorizedOperations(true))
.allTopicNames().get(30, TimeUnit.SECONDS).get(topic);
topicDescription = tkmsKafkaAdminProvider.getKafkaAdmin(request.getShardPartition()).describeTopics(Collections.singleton(topic),
new DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames().get(30, TimeUnit.SECONDS).get(topic);
} catch (Throwable t) {
if (t instanceof ExecutionException) {
throwable = t.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ protected int getPartition(int shard, TkmsMessage message) {
@Data
@Accessors(chain = true)
protected static class FetchTopicDescriptionRequest {

private TkmsShardPartition shardPartition;
private String topic;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import org.apache.kafka.clients.admin.Admin;

public interface ITkmsKafkaAdminProvider {

Admin getKafkaAdmin();
Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition);

void closeKafkaAdmin();
void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public interface ITkmsKafkaProducerProvider {

void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase);

void closeKafkaProducerForTopicValidation();
void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition);

void closeKafkaProducersForTopicValidation();

enum UseCase {
PROXY,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.transferwise.kafka.tkms.config;

import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.TkmsProperties.ShardProperties;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
Expand All @@ -19,6 +22,7 @@
@Slf4j
public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, GracefulShutdownStrategy {

private static final Set<String> CONFIG_NAMES = AdminClientConfig.configNames();
/**
* Keep the kafka-clients' MBean registration happy.
*/
Expand All @@ -30,11 +34,11 @@ public class TkmsKafkaAdminProvider implements ITkmsKafkaAdminProvider, Graceful
@Autowired
private MeterRegistry meterRegistry;

private Map<Long, AdminEntry> admins = new ConcurrentHashMap<>();
private Map<TkmsShardPartition, AdminEntry> admins = new ConcurrentHashMap<>();

@Override
public Admin getKafkaAdmin() {
return admins.computeIfAbsent(0L, key -> {
public Admin getKafkaAdmin(TkmsShardPartition tkmsShardPartition) {
return admins.computeIfAbsent(tkmsShardPartition, key -> {
var configs = new HashMap<String, Object>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'.");
Expand All @@ -43,13 +47,21 @@ public Admin getKafkaAdmin() {
configs.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 100);
configs.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000);

var configNames = AdminClientConfig.configNames();
for (var e : tkmsProperties.getKafka().entrySet()) {
if (configNames.contains(e.getKey())) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}

ShardProperties shardProperties = tkmsProperties.getShards().get(tkmsShardPartition.getShard());
if (shardProperties != null) {
for (var e : shardProperties.getKafka().entrySet()) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}
}

final var admin = KafkaAdminClient.create(configs);
final var kafkaClientMetrics = new KafkaClientMetrics(admin);
kafkaClientMetrics.bindTo(meterRegistry);
Expand All @@ -59,8 +71,8 @@ public Admin getKafkaAdmin() {
}

@Override
public void closeKafkaAdmin() {
var adminEntry = admins.remove(0L);
public void closeKafkaAdmin(TkmsShardPartition tkmsShardPartition) {
var adminEntry = admins.remove(tkmsShardPartition);

if (adminEntry == null) {
return;
Expand All @@ -77,7 +89,7 @@ public void closeKafkaAdmin() {

@Override
public void applicationTerminating() {
closeKafkaAdmin();
admins.keySet().forEach((this::closeKafkaAdmin));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
Expand All @@ -23,6 +24,8 @@
@Slf4j
public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, GracefulShutdownStrategy {

private static final Set<String> CONFIG_NAMES = ProducerConfig.configNames();

/**
* Keep the kafka-clients' MBean registration happy.
*/
Expand Down Expand Up @@ -66,11 +69,19 @@ public KafkaProducer<String, byte[]> getKafkaProducer(TkmsShardPartition shardPa
}
configs.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "120000");

configs.putAll(tkmsProperties.getKafka());
for (var e : tkmsProperties.getKafka().entrySet()) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}

ShardProperties shardProperties = tkmsProperties.getShards().get(shardPartition.getShard());
if (shardProperties != null) {
configs.putAll(shardProperties.getKafka());
for (var e : shardProperties.getKafka().entrySet()) {
if (CONFIG_NAMES.contains(e.getKey())) {
configs.put(e.getKey(), e.getValue());
}
}
}

final var producer = new KafkaProducer<String, byte[]>(configs);
Expand Down Expand Up @@ -104,8 +115,14 @@ public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCas
}

@Override
public void closeKafkaProducerForTopicValidation() {
closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION);
public void closeKafkaProducerForTopicValidation(TkmsShardPartition tkmsShardPartition) {
closeKafkaProducer(tkmsShardPartition, UseCase.TOPIC_VALIDATION);
}

@Override
public void closeKafkaProducersForTopicValidation() {
producers.keySet().stream().filter(key -> key.getRight() == UseCase.TOPIC_VALIDATION)
.forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void testIfEarliestMessageTrackerBehavesAsExpected() {
earliestMessageTracker.init();

assertThat(earliestMessageTracker.getEarliestMessageId()).isEqualTo(committedValue);

assertThat(pollingAllRecordsHappened);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean
.hasMessageContaining(expectedMessage);
} finally {
// Stop logs spam about not existing topic in metadata.
tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation();
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
}
}

Expand Down Expand Up @@ -702,7 +702,7 @@ void testThatSendingLargeMessagesWillNotCauseAnIssue(boolean deferUntilCommit) {
if (receivedValue.equals(message.getValue())) {
receivedCount.incrementAndGet();
} else {
throw new IllegalStateException("Wrong message received.");
throw new IllegalStateException("Wrong message received: " + receivedValue);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class KafkaMetricsIntTest extends BaseIntTest {
class KafkaServerMetricsIntTest extends BaseIntTest {


@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.transferwise.kafka.tkms.test.TestMessagesInterceptor;
import com.transferwise.kafka.tkms.test.TestProperties;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.transferwise.kafka.tkms;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper;
import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.test.BaseIntTest;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;

public class MultiServerTopicValidationIntTest extends BaseIntTest {

@Autowired
private ITransactionalKafkaMessageSender transactionalKafkaMessageSender;
@Autowired
private ITransactionsHelper transactionsHelper;
@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;

@AfterEach
public void cleanup() {
super.cleanup();

tkmsProperties.getTopicValidation().setUseAdminClient(false);
tkmsProperties.getTopicValidation().setTryToAutoCreateTopics(true);
}

private static Stream<Arguments> unknownTopicsMatrix() {
return Stream.of(
Arguments.of(true),
Arguments.of(false)
);
}

@ParameterizedTest
@MethodSource("unknownTopicsMatrix")
@SneakyThrows
void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean useAdminClient) {
tkmsProperties.getTopicValidation().setUseAdminClient(useAdminClient);

final var notExistingTopic = "NotExistingTopic";
final var existingTopicInKafka2 = "TestTopicInAnotherServer";

assertThatThrownBy(() -> sendMessage(notExistingTopic, null)).hasMessageContaining(notExistingTopic);

sendMessage("TestTopic", null);
sendMessage("TestTopic", tkmsProperties.getDefaultShard());

assertThatThrownBy(() -> sendMessage(existingTopicInKafka2, null)).hasMessageContaining(existingTopicInKafka2);

sendMessage(existingTopicInKafka2, 2);
}

protected void sendMessage(String topic, Integer shard) {
try {
transactionsHelper.withTransaction().run(() -> {
var message = new TkmsMessage().setTopic(topic).setValue("Stuff".getBytes(StandardCharsets.UTF_8));
if (shard != null) {
message.setShard(shard);
}
transactionalKafkaMessageSender.sendMessage(message);
});
} catch (Exception e) {
throw new RuntimeException("Sending message to topic '" + topic + "' and shard " + shard + " failed.");
} finally {
// Stop spam of metadata errors.
tkmsKafkaProducerProvider.closeKafkaProducersForTopicValidation();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.List;
import org.junit.jupiter.api.Test;

class TransactionalKafkaMessageSenderTest {
class TransactionalKafkaMessageSenderTestServer {

@Test
void deleteBatchSizesAreCorrectlyValidated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

class TkmsKafkaProducerProviderTest extends BaseIntTest {
class TkmsKafkaProducerProviderTestServer extends BaseIntTest {

@Autowired
private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public InsertMessageResult insertMessage(TkmsShardPartition shardPartition, Tkms
throw new IllegalStateException("Haha, inserts are failing lol.");
}
}

return delegate.insertMessage(shardPartition, message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void tearDownClass() {
@ProductionBug("Delete worked, but batches were combined wrongly.")
void deletingInBatchesWorks() {
var tkmsDao = tkmsDaoProvider.getTkmsDao(0);

List<Long> records = new ArrayList<>();
for (int i = 0; i < 1001; i++) {
records.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public void setup() {
}
meterCache.clear();


TkmsClockHolder.reset();
}

Expand Down
Loading

0 comments on commit 2d834c4

Please sign in to comment.