Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller][compat] Controller part change for supporting separate real-time topic functionality for hybrid stores. #1172

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ static UpdateStoreQueryParams getUpdateStoreQueryParams(CommandLine cmd) {
integerParam(cmd, Arg.BATCH_GET_LIMIT, p -> params.setBatchGetLimit(p), argSet);
integerParam(cmd, Arg.NUM_VERSIONS_TO_PRESERVE, p -> params.setNumVersionsToPreserve(p), argSet);
booleanParam(cmd, Arg.INCREMENTAL_PUSH_ENABLED, p -> params.setIncrementalPushEnabled(p), argSet);
booleanParam(cmd, Arg.SEPARATE_REALTIME_TOPIC_ENABLED, p -> params.setSeparateRealTimeTopicEnabled(p), argSet);
booleanParam(cmd, Arg.WRITE_COMPUTATION_ENABLED, p -> params.setWriteComputationEnabled(p), argSet);
booleanParam(cmd, Arg.READ_COMPUTATION_ENABLED, p -> params.setReadComputationEnabled(p), argSet);
integerParam(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public enum Arg {
),
INCREMENTAL_PUSH_ENABLED(
"incremental-push-enabled", "ipe", true, "a flag to see if the store supports incremental push or not"
),
SEPARATE_REALTIME_TOPIC_ENABLED(
"separate-realtime-topic-enabled", "srte", true,
"a flag to see if the store supports separate real-time topic or not"
), BATCH_GET_LIMIT("batch-get-limit", "bgl", true, "Key number limit inside one batch-get request"),
NUM_VERSIONS_TO_PRESERVE("num-versions-to-preserve", "nvp", true, "Number of version that store should preserve."),
KAFKA_BOOTSTRAP_SERVERS("kafka-bootstrap-servers", "kbs", true, "Kafka bootstrap server URL(s)"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import static com.linkedin.venice.Arg.REPLICATION_FACTOR;
import static com.linkedin.venice.Arg.RETRY;
import static com.linkedin.venice.Arg.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.SEPARATE_REALTIME_TOPIC_ENABLED;
import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.Arg.SERVER_URL;
import static com.linkedin.venice.Arg.SKIP_DIV;
Expand Down Expand Up @@ -267,7 +268,7 @@ public enum Command {
ACTIVE_ACTIVE_REPLICATION_ENABLED, REGIONS_FILTER, DISABLE_META_STORE, DISABLE_DAVINCI_PUSH_STATUS_STORE,
STORAGE_PERSONA, STORE_VIEW_CONFIGS, LATEST_SUPERSET_SCHEMA_ID, MIN_COMPACTION_LAG_SECONDS,
MAX_COMPACTION_LAG_SECONDS, MAX_RECORD_SIZE_BYTES, MAX_NEARLINE_RECORD_SIZE_BYTES,
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED }
UNUSED_SCHEMA_DELETION_ENABLED, BLOB_TRANSFER_ENABLED, SEPARATE_REALTIME_TOPIC_ENABLED }
),
UPDATE_CLUSTER_CONFIG(
"update-cluster-config", "Update live cluster configs", new Arg[] { URL, CLUSTER },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,14 @@ private ConfigKeys() {
*/
public static final String ENABLE_INCREMENTAL_PUSH_FOR_HYBRID_ACTIVE_ACTIVE_USER_STORES =
"enable.incremental.push.for.hybrid.active.active.user.stores";

/**
* We will use this config to determine whether we should enable separate real-time topic for incremental push enabled stores.
* If this config is set to true, we will enable separate real-time topic for incremental push enabled stores.
*/
public static final String ENABLE_SEPARATE_REAL_TIME_TOPIC_FOR_STORE_WITH_INCREMENTAL_PUSH =
"enable.separate.real.time.topic.for.store.with.incremental.push";
haoxu07 marked this conversation as resolved.
Show resolved Hide resolved

/**
* We will use this config to determine whether we should enable partial update for hybrid active-active user stores.
* If this config is set to true, we will enable partial update for hybrid active-active user stores whose latest value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ControllerApiConstants {
public static final String CHUNKING_ENABLED = "chunking_enabled";
public static final String RMD_CHUNKING_ENABLED = "rmd_chunking_enabled";
public static final String INCREMENTAL_PUSH_ENABLED = "incremental_push_enabled";
public static final String SEPARATE_REAL_TIME_TOPIC_ENABLED = "separate_realtime_topic_enabled";
public static final String SINGLE_GET_ROUTER_CACHE_ENABLED = "single_get_router_cache_enabled";
public static final String BATCH_GET_ROUTER_CACHE_ENABLED = "batch_get_router_cache_enabled";
public static final String BATCH_GET_LIMIT = "batch_get_limit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REWIND_TIME_IN_SECONDS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.SEPARATE_REAL_TIME_TOPIC_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_NODE_READ_QUOTA_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORAGE_QUOTA_IN_BYTE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_MIGRATION;
Expand Down Expand Up @@ -406,6 +407,14 @@ public Optional<Boolean> getIncrementalPushEnabled() {
return getBoolean(INCREMENTAL_PUSH_ENABLED);
}

public UpdateStoreQueryParams setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
return putBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED, separateRealTimeTopicEnabled);
}

public Optional<Boolean> getSeparateRealTimeTopicEnabled() {
return getBoolean(SEPARATE_REAL_TIME_TOPIC_ENABLED);
}

public UpdateStoreQueryParams setBatchGetLimit(int batchGetLimit) {
return putInteger(BATCH_GET_LIMIT, batchGetLimit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setIncrementalPushEnabled(isIncrementalPushEnabled());

version.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());

version.setBlobTransferEnabled(isBlobTransferEnabled());

version.setUseVersionLevelIncrementalPushEnabled(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isBlobTransferEnabled() {
return this.delegate.isBlobTransferEnabled();
Expand Down Expand Up @@ -955,6 +965,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.delegate.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throw new UnsupportedOperationException();
}

@Override
public boolean isAccessControlled() {
return this.delegate.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ static boolean isSystemStore(String storeName) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isAccessControlled();

void setAccessControlled(boolean accessControlled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
throwUnsupportedOperationException("setIncrementalPushEnabled");
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return zkSharedStore.isSeparateRealTimeTopicEnabled();
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
throwUnsupportedOperationException("setSeparateRealTimeTopicEnabled");
}

@Override
public boolean isAccessControlled() {
return zkSharedStore.isAccessControlled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
String VERSION_SEPARATOR = "_v";
String REAL_TIME_TOPIC_SUFFIX = "_rt";
String STREAM_REPROCESSING_TOPIC_SUFFIX = "_sr";

String SEPARATE_REAL_TIME_TOPIC_SUFFIX = "_rt_sep";
/**
* Special number indicating no replication metadata version is set.
*/
Expand Down Expand Up @@ -162,6 +162,10 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setIncrementalPushEnabled(boolean incrementalPushEnabled);

boolean isSeparateRealTimeTopicEnabled();

void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled);

boolean isBlobTransferEnabled();

void setBlobTransferEnabled(boolean blobTransferEnabled);
Expand Down Expand Up @@ -289,6 +293,10 @@ static String composeRealTimeTopic(String storeName) {
return storeName + REAL_TIME_TOPIC_SUFFIX;
}

static String composeSeparateRealTimeTopic(String storeName) {
return storeName + SEPARATE_REAL_TIME_TOPIC_SUFFIX;
}

static String composeStreamReprocessingTopic(String storeName, int versionNumber) {
return composeKafkaTopic(storeName, versionNumber) + STREAM_REPROCESSING_TOPIC_SUFFIX;
}
Expand All @@ -308,7 +316,10 @@ static String parseStoreFromRealTimeTopic(String kafkaTopic) {
if (!isRealTimeTopic(kafkaTopic)) {
throw new VeniceException("Kafka topic: " + kafkaTopic + " is not a real-time topic");
}
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
if (kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX)) {
return kafkaTopic.substring(0, kafkaTopic.length() - REAL_TIME_TOPIC_SUFFIX.length());
}
return kafkaTopic.substring(0, kafkaTopic.length() - SEPARATE_REAL_TIME_TOPIC_SUFFIX.length());
}

static String parseStoreFromStreamReprocessingTopic(String kafkaTopic) {
Expand Down Expand Up @@ -337,7 +348,7 @@ static String parseStoreFromKafkaTopicName(String kafkaTopic) {
}

static boolean isRealTimeTopic(String kafkaTopic) {
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX);
return kafkaTopic.endsWith(REAL_TIME_TOPIC_SUFFIX) || kafkaTopic.endsWith(SEPARATE_REAL_TIME_TOPIC_SUFFIX);
}

static boolean isStreamReprocessingTopic(String kafkaTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeVersion.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return storeVersion.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeVersion.setSeparateRealTimeTopicEnabled(separateRealTimeTopicEnabled);
}

@Override
public boolean isBlobTransferEnabled() {
return this.storeVersion.blobTransferEnabled;
Expand Down Expand Up @@ -450,6 +460,7 @@ public Version cloneVersion() {
clonedVersion.setReplicationFactor(getReplicationFactor());
clonedVersion.setNativeReplicationSourceFabric(getNativeReplicationSourceFabric());
clonedVersion.setIncrementalPushEnabled(isIncrementalPushEnabled());
clonedVersion.setSeparateRealTimeTopicEnabled(isSeparateRealTimeTopicEnabled());
clonedVersion.setUseVersionLevelIncrementalPushEnabled(isUseVersionLevelIncrementalPushEnabled());
clonedVersion.setHybridStoreConfig(getHybridStoreConfig());
clonedVersion.setUseVersionLevelHybridConfig(isUseVersionLevelHybridConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public ZKStore(Store store) {
setBatchGetLimit(store.getBatchGetLimit());
setNumVersionsToPreserve(store.getNumVersionsToPreserve());
setIncrementalPushEnabled(store.isIncrementalPushEnabled());
setSeparateRealTimeTopicEnabled(store.isSeparateRealTimeTopicEnabled());
setLargestUsedVersionNumber(store.getLargestUsedVersionNumber());
setMigrating(store.isMigrating());
setWriteComputationEnabled(store.isWriteComputationEnabled());
Expand Down Expand Up @@ -544,6 +545,16 @@ public void setIncrementalPushEnabled(boolean incrementalPushEnabled) {
this.storeProperties.incrementalPushEnabled = incrementalPushEnabled;
}

@Override
public boolean isSeparateRealTimeTopicEnabled() {
return this.storeProperties.separateRealTimeTopicEnabled;
}

@Override
public void setSeparateRealTimeTopicEnabled(boolean separateRealTimeTopicEnabled) {
this.storeProperties.separateRealTimeTopicEnabled = separateRealTimeTopicEnabled;
}

/**
* @deprecated The store level accessControlled flag is no longer valid to be used to skip ACL checks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(80, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(81, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(23, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(24, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Loading
Loading