Skip to content

Commit

Permalink
[controller][common] Implemente separate real-time topic functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hao Xu committed Sep 12, 2024
1 parent c25c658 commit 670f703
Show file tree
Hide file tree
Showing 20 changed files with 1,715 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ControllerApiConstants {
public static final String CHUNKING_ENABLED = "chunking_enabled";
public static final String RMD_CHUNKING_ENABLED = "rmd_chunking_enabled";
public static final String INCREMENTAL_PUSH_ENABLED = "incremental_push_enabled";
public static final String SEPARATE_REAL_TIME_TOPIC_ENABLED = "separate_realtime_topic_enabled";
public static final String SINGLE_GET_ROUTER_CACHE_ENABLED = "single_get_router_cache_enabled";
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SEPARATE_REAL_TIME_TOPIC_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_NODE_READ_QUOTA_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION;
Expand Down Expand Up @@ -406,6 +407,14 @@ public Optional<Boolean> getIncrementalPushEnabled() {
return getBoolean(INCREMENTAL_PUSH_ENABLED);
}

public UpdateStoreQueryParams setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
return putBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED, separateRealTimeTopicEnabled);
}

public Optional<Boolean> getSeparateRealTimeTopicEnabled() {
return getBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED);
}

public UpdateStoreQueryParams setBatchGetLimit(int batchGetLimit) {
return putInteger(BATCH_GET_LIMIT, batchGetLimit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setIncrementalPushEnabled(isIncrementalPushEnabled());

version.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());

version.setBlobTransferEnabled(isBlobTransferEnabled());

version.setUseVersionLevelIncrementalPushEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isBlobTransferEnabled() {
return this.delegate.isBlobTransferEnabled();
Expand Down Expand Up @@ -955,6 +965,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isAccessControlled() {
return this.delegate.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ static boolean isSystemStore(String storeName) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isAccessControlled();

void setAccessControlled(boolean accessControlled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throwUnsupportedOperationException("setIncrementalPushEnabled");
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return zkSharedStore.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throwUnsupportedOperationException("setSeparateRealTimeTopicEnabled");
}

@Override
public boolean isAccessControlled() {
return zkSharedStore.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
String VERSION_SEPARATOR = "_v";
String REAL_TIME_TOPIC_SUFFIX = "_rt";
String STREAM_REPROCESSING_TOPIC_SUFFIX = "_sr";

String INCREMENTAL_PUSH_REAL_TIME_TOPIC_SUFFIX = "_rt_sep";
/**
* Special number indicating no replication metadata version is set.
*/
Expand Down Expand Up @@ -162,6 +162,10 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isBlobTransferEnabled();

void setBlobTransferEnabled(boolean blobTransferEnabled);
Expand Down Expand Up @@ -289,6 +293,10 @@ static String composeRealTimeTopic(String storeName) {
return storeName + REAL_TIME_TOPIC_SUFFIX;
}

static String composeSeparateRealTimeTopic(String storeName) {
return storeName + INCREMENTAL_PUSH_REAL_TIME_TOPIC_SUFFIX;
}

static String composeStreamReprocessingTopic(String storeName, int versionNumber) {
return composeKafkaTopic(storeName, versionNumber) + STREAM_REPROCESSING_TOPIC_SUFFIX;
}
Expand All @@ -308,7 +316,10 @@ static String parseStoreFromRealTimeTopic(String kafkaTopic) {
if (!isRealTimeTopic(kafkaTopic)) {
throw new VeniceException("Kafka topic: " + kafkaTopic + " is not a real-time topic");
}
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
if (kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX)) {
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
}
return kafkaTopic.substring(0, kafkaTopic.length() - INCREMENTAL_PUSH_REAL_TIME_TOPIC_SUFFIX.length());
}

static String parseStoreFromStreamReprocessingTopic(String kafkaTopic) {
Expand Down Expand Up @@ -337,7 +348,7 @@ static String parseStoreFromKafkaTopicName(String kafkaTopic) {
}

static boolean isRealTimeTopic(String kafkaTopic) {
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX);
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX) || kafkaTopic.endsWith(INCREMENTAL_PUSH_REAL_TIME_TOPIC_SUFFIX);
}

static boolean isStreamReprocessingTopic(String kafkaTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeVersion.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return storeVersion.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeVersion.setSeparateRealTimeTopicEnabled(separateRealTimeTopicEnabled);
}

@Override
public boolean isBlobTransferEnabled() {
return this.storeVersion.blobTransferEnabled;
Expand Down Expand Up @@ -450,6 +460,7 @@ public Version cloneVersion() {
clonedVersion.setReplicationFactor(getReplicationFactor());
clonedVersion.setNativeReplicationSourceFabric(getNativeReplicationSourceFabric());
clonedVersion.setIncrementalPushEnabled(isIncrementalPushEnabled());
clonedVersion.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());
clonedVersion.setUseVersionLevelIncrementalPushEnabled(isUseVersionLevelIncrementalPushEnabled());
clonedVersion.setHybridStoreConfig(getHybridStoreConfig());
clonedVersion.setUseVersionLevelHybridConfig(isUseVersionLevelHybridConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public ZKStore(Store store) {
setBatchGetLimit(store.getBatchGetLimit());
setNumVersionsToPreserve(store.getNumVersionsToPreserve());
setIncrementalPushEnabled(store.isIncrementalPushEnabled());
setSeparateRealTimeTopicEnabled(store.isSeparateRealTimeTopicEnabled());
setLargestUsedVersionNumber(store.getLargestUsedVersionNumber());
setMigrating(store.isMigrating());
setWriteComputationEnabled(store.isWriteComputationEnabled());
Expand Down Expand Up @@ -544,6 +545,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeProperties.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.storeProperties.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeProperties.separateRealTimeTopicEnabled = separateRealTimeTopicEnabled;
}

/**
* @deprecated The store level accessControlled flag is no longer valid to be used to skip ACL checks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(80, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(81, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(23, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(24, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading

0 comments on commit 670f703

Please sign in to comment.