Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 18, 2024
1 parent 42639a4 commit 5604dc7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,11 @@
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_CHECKSUM_VERIFICATION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_DATABASE_SYNC_BYTES_INTERNAL_FOR_DEFERRED_WRITE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_RE_PUSH_REWIND_IN_SECONDS_OVERRIDE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.INCREMENTAL_PUSH;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.INPUT_PATH_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_MAX_RECORDS_PER_MAPPER;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.KAFKA_INPUT_TOPIC;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SEND_CONTROL_MESSAGES_DIRECTLY;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.SOURCE_KAFKA;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.*;
import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.D2_SERVICE_NAME;
import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.PARENT_D2_SERVICE_NAME;
import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB;
import static com.linkedin.venice.pubsub.PubSubConstants.*;
import static com.linkedin.venice.samza.VeniceSystemFactory.DEPLOYMENT_ID;
import static com.linkedin.venice.samza.VeniceSystemFactory.DOT;
import static com.linkedin.venice.samza.VeniceSystemFactory.SYSTEMS_PREFIX;
Expand Down Expand Up @@ -86,6 +76,7 @@
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.manager.TopicManager;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
Expand Down Expand Up @@ -199,6 +190,7 @@ public void setUp() {
Optional.of(serverProperties),
false);
childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();

parentControllers = multiRegionMultiClusterWrapper.getParentControllers();
VeniceClusterWrapper clusterWrapper =
multiRegionMultiClusterWrapper.getChildRegions().get(0).getClusters().get(CLUSTER_NAMES[0]);
Expand Down Expand Up @@ -575,6 +567,91 @@ public void testActiveActiveForHeartbeatSystemStores() throws Exception {
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testMultiDataCenterIncrementalPushWithSeparateTopic() throws Exception {
File inputDirInc = getTempDataDirectory();

motherOfAllTests(
"testMultiDataCenterIncrementalPushWithSeparateTopic",
updateStoreQueryParams -> updateStoreQueryParams.setPartitionCount(1)
.setHybridOffsetLagThreshold(TEST_TIMEOUT)
.setHybridRewindSeconds(2L)
.setIncrementalPushEnabled(true)
.setSeparateRealTimeTopicEnabled(true),
100,
(parentControllerClient, clusterName, storeName, props, inputDir) -> {
try (VenicePushJob job = new VenicePushJob("Batch Push", props)) {
job.run();
// Verify the kafka URL being returned to the push job is the same as dc-0 kafka url.
Assert.assertEquals(job.getKafkaUrl(), childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
}

props.setProperty(INCREMENTAL_PUSH, "true");
props.put(INPUT_PATH_PROP, inputDirInc);
props.put(SEND_CONTROL_MESSAGES_DIRECTLY, true);

TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema2(inputDirInc);

VeniceClusterWrapper veniceClusterWrapperDC0 = childDatacenters.get(0).getClusters().get(clusterName);
TopicManager topicManagerDC0 =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
veniceClusterWrapperDC0.getPubSubBrokerWrapper(),
veniceClusterWrapperDC0.getPubSubTopicRepository())
.getLocalTopicManager();

VeniceClusterWrapper veniceClusterWrapperDC1 = childDatacenters.get(1).getClusters().get(clusterName);

// Print all the kafka cluster URLs
LOGGER.info("KafkaURL {}:{}", "dc-0", childDatacenters.get(0).getKafkaBrokerWrapper().getAddress());
LOGGER.info("KafkaURL {}:{}", "dc-1", childDatacenters.get(1).getKafkaBrokerWrapper().getAddress());
LOGGER.info(
"KafkaURL {}:{}",
"parent",
multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper().getAddress());

TopicManager topicManagerDC1 =
IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
veniceClusterWrapperDC1.getPubSubBrokerWrapper(),
veniceClusterWrapperDC1.getPubSubTopicRepository())
.getLocalTopicManager();

TopicManager topicManagerParent = IntegrationTestPushUtils
.getTopicManagerRepo(
PUBSUB_OPERATION_TIMEOUT_MS_DEFAULT_VALUE,
100,
0l,
multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper(),
veniceClusterWrapperDC1.getPubSubTopicRepository())
.getTopicManager(multiRegionMultiClusterWrapper.getParentKafkaBrokerWrapper().getAddress());

PubSubTopicPartition versionTopicPartition = new PubSubTopicPartitionImpl(
pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(storeName)),
0);
try (VenicePushJob job = new VenicePushJob("Incremental Push", props)) {
CompletableFuture.runAsync(job::run);
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
Assert.assertTrue(topicManagerDC0.containsTopic(versionTopicPartition.getPubSubTopic()));
long offsetDC0 = topicManagerDC0.getLatestOffsetWithRetries(versionTopicPartition, 3);
System.out.println(topicManagerDC0.getPubSubClusterAddress() + " Offset topicManagerDC0: " + offsetDC0);
long offsetDC1 = topicManagerDC1.getLatestOffsetWithRetries(versionTopicPartition, 3);
System.out.println(topicManagerDC1.getPubSubClusterAddress() + " Offset topicManagerDC1: " + offsetDC1);
long offsetParent = topicManagerParent.getLatestOffsetWithRetries(versionTopicPartition, 3);
System.out.println("Offset topicManagerParent: " + offsetParent);
Assert.assertTrue(offsetParent > 10);
});
job.cancel();
}
});
}

@Test(timeOut = TEST_TIMEOUT)
public void testMultiDataCenterRePushWithIncrementalPush() throws Exception {
motherOfAllTests(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2471,6 +2471,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa
if (setStore.incrementalPushEnabled
&& controllerConfig.enabledSeparateRealTimeTopicForStoreWithIncrementalPush()) {
setStore.separateRealTimeTopicEnabled = true;
updatedConfigsList.add(SEPARATE_REAL_TIME_TOPIC_ENABLED);
}

// When turning off hybrid store, we will also turn off incremental store config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ void ensurePreconditions(
if (!hybridStoreConfig.isPresent()) {
throw new VeniceException("Topic switching is only supported for Hybrid Stores.");
}
Version version =
store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName()));
/**
* TopicReplicator is used in child fabrics to create real-time (RT) topic when a child fabric
* is ready to start buffer replay but RT topic doesn't exist. This scenario could happen for a
Expand All @@ -146,45 +148,46 @@ void ensurePreconditions(
* doesn't have any existing version or a correct storage quota, we cannot decide the partition
* number for it.
*/
if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(srcTopicName)) {
createRealTimeTopicIfNeeded(store, version, srcTopicName, hybridStoreConfig.get());
if (version != null && version.isSeparateRealTimeTopicEnabled()) {
PubSubTopic separateRealTimeTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store.getName()));
createRealTimeTopicIfNeeded(store, version, separateRealTimeTopic, hybridStoreConfig.get());
}
}

void createRealTimeTopicIfNeeded(
Store store,
Version version,
PubSubTopic realTimeTopic,
HybridStoreConfig hybridStoreConfig) {
if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(realTimeTopic)) {
int partitionCount;
Version version =
store.getVersion(Version.parseVersionFromKafkaTopicName(topicWhereToSendTheTopicSwitch.getName()));
if (version != null) {
partitionCount = version.getPartitionCount();
} else {
partitionCount = store.getPartitionCount();
}
int replicationFactor = srcTopicName.isRealTime() ? kafkaReplicationFactorForRTTopics : kafkaReplicationFactor;
Optional<Integer> minISR = srcTopicName.isRealTime() ? minSyncReplicasForRTTopics : Optional.empty();
int replicationFactor = realTimeTopic.isRealTime() ? kafkaReplicationFactorForRTTopics : kafkaReplicationFactor;
Optional<Integer> minISR = realTimeTopic.isRealTime() ? minSyncReplicasForRTTopics : Optional.empty();
getTopicManager().createTopic(
srcTopicName,
realTimeTopic,
partitionCount,
replicationFactor,
StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig.get()),
false, // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck
StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig),
false,
minISR,
false);
if (version.isSeparateRealTimeTopicEnabled()) {
getTopicManager().createTopic(
pubSubTopicRepository.getTopic(Version.composeSeparateRealTimeTopic(store.getName())),
partitionCount,
replicationFactor,
StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig.get()),
false, // Note: do not enable RT compaction! Might make jobs in Online/Offline model stuck
minISR,
false);
}
} else {
/**
* If real-time topic already exists, check whether its retention time is correct.
*/
long topicRetentionTimeInMs = getTopicManager().getTopicRetention(srcTopicName);
long expectedRetentionTimeMs = StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig.get());
long topicRetentionTimeInMs = getTopicManager().getTopicRetention(realTimeTopic);
long expectedRetentionTimeMs = StoreUtils.getExpectedRetentionTimeInMs(store, hybridStoreConfig);
if (topicRetentionTimeInMs != expectedRetentionTimeMs) {
getTopicManager().updateTopicRetention(srcTopicName, expectedRetentionTimeMs);
getTopicManager().updateTopicRetention(realTimeTopic, expectedRetentionTimeMs);
}
}

}

long getRewindStartTime(
Expand Down

0 comments on commit 5604dc7

Please sign in to comment.