Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci] Add delayed ingestion in dvc for target region push #1510

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package com.linkedin.davinci;

import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ConcurrentRef;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.RegionUtils;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -244,9 +248,31 @@ synchronized void trySubscribeDaVinciFutureVersion() {
} else {
return;
}
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));

Set<String> targetRegions = RegionUtils.parseRegionsFilterList(targetVersion.getTargetSwapRegion());
VeniceServerConfig veniceServerConfig = backend.getConfigLoader().getVeniceServerConfig();
String currentRegion = veniceServerConfig.getRegionName();
boolean isTargetRegionEnabled = !StringUtils.isEmpty(targetVersion.getTargetSwapRegion());
boolean startIngestionInNonTargetRegion = isTargetRegionEnabled && !targetRegions.contains(currentRegion)
&& targetVersion.getStatus() == VersionStatus.ONLINE;

// Subscribe to the future version if:
// 1. Target region push with delayed ingestion is not enabled
// 2. Target region push with delayed ingestion is enabled and the current region is a target region
// 3. Target region push with delayed ingestion is enabled and the current region is a non target region
// and the wait time has elapsed. The wait time has elapsed when the version status is marked ONLINE
if (targetRegions.contains(currentRegion) || startIngestionInNonTargetRegion || !isTargetRegionEnabled) {
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
} else {
LOGGER.info(
"Skipping subscribe to future version: {} in region: {} because the target version status is: {} and the target regions are: {}",
targetVersion.kafkaTopicName(),
currentRegion,
targetVersion.getStatus(),
targetVersion.getTargetSwapRegion());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.PropertyBuilder;
Expand Down Expand Up @@ -77,6 +78,7 @@ void setUp() {
.put(ConfigKeys.ZOOKEEPER_ADDRESS, "test-zookeeper")
.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, "test-kafka")
.put(ConfigKeys.DATA_BASE_PATH, baseDataPath.getAbsolutePath())
.put(ConfigKeys.LOCAL_REGION_NAME, "dc-0")
.build();

ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
Expand Down Expand Up @@ -417,4 +419,51 @@ void testRollbackAndRollForward() {
}
});
}

@Test
public void testSubscribeWithDelayedIngestionEnabled() throws Exception {
// delayed ingestion is not enabled; no target regions are set
CompletableFuture subscribeResult = storeBackend.subscribe(ComplementSet.of(0));
versionMap.get(version1.kafkaTopicName()).completePartition(0);
subscribeResult.get(3, TimeUnit.SECONDS);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber());
}

versionMap.get(version2.kafkaTopicName()).completePartition(0);
store.setCurrentVersion(version2.getNumber());
backend.handleStoreChanged(storeBackend);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version2.getNumber());
}

// delayed ingestion is enabled, target region is the current region
store.setTargetSwapRegion("dc-0");
Version version3 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
store.addVersion(version3);
backend.handleStoreChanged(storeBackend);

store.setCurrentVersion(version3.getNumber());
versionMap.get(version3.kafkaTopicName()).completePartition(0);
backend.handleStoreChanged(storeBackend);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version3.getNumber());
}

// delayed ingestion is enabled, target region is not the current region
store.setTargetSwapRegion("dc-1");
Version version4 = new VersionImpl(store.getName(), store.peekNextVersion().getNumber(), null, 15);
store.addVersion(version4);
backend.handleStoreChanged(storeBackend);

store.setCurrentVersion(version4.getNumber());
store.updateVersionStatus(version4.getNumber(), VersionStatus.ONLINE);
backend.handleStoreChanged(storeBackend);

versionMap.get(version4.kafkaTopicName()).completePartition(0);
backend.handleStoreChanged(storeBackend);
try (ReferenceCounted<VersionBackend> versionRef = storeBackend.getDaVinciCurrentVersion()) {
assertEquals(versionRef.get().getVersion().getNumber(), version4.getNumber());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2442,6 +2442,8 @@ private ConfigKeys() {
public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS = "controller.deferred.version.swap.sleep.ms";
public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED =
"controller.deferred.version.swap.service.enabled";
public static final String DEFERRED_VERSION_SWAP_SERVICE_WITH_DVC_CHECK_ENABLED =
"deferred.version.swap.service.with.dvc.check.enabled";

/*
* Both Router and Server will maintain an in-memory cache for connection-level ACLs and the following config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ private void addVersion(Version version, boolean checkDisableWrite, boolean isCl

version.setTargetSwapRegionWaitTime(getTargetSwapRegionWaitTime());

version.setIsDavinciHeartbeatReported(getIsDavinciHeartbeatReported());

HybridStoreConfig hybridStoreConfig = getHybridStoreConfig();
if (hybridStoreConfig != null) {
version.setHybridStoreConfig(hybridStoreConfig.clone());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.DEFERRED_VERSION_SWAP_SERVICE_WITH_DVC_CHECK_ENABLED;
import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.StoreInfo;
Expand All @@ -19,8 +31,10 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,11 +54,14 @@ public class TestDeferredVersionSwap {
private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

private static final int TEST_TIMEOUT = 120_000;

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, 30000);
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED, true);
controllerProps.put(DEFERRED_VERSION_SWAP_SERVICE_WITH_DVC_CHECK_ENABLED, false);
Properties serverProperties = new Properties();

VeniceMultiRegionClusterCreateOptions.Builder optionsBuilder =
Expand All @@ -68,7 +85,7 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
@Test(timeOut = TEST_TIMEOUT)
public void testDeferredVersionSwap() throws IOException {
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
Expand All @@ -80,14 +97,14 @@ public void testDeferredVersionSwap() throws IOException {
String keySchemaStr = "\"string\"";
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
storeParms.setTargetRegionSwap(TARGET_REGION);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, NAME_RECORD_V3_SCHEMA.toString(), props, storeParms).close();

// Start push job with target region push enabled
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
props.put(TARGETED_REGION_PUSH_LIST, TARGET_REGION);
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
Expand Down Expand Up @@ -131,4 +148,137 @@ public void testDeferredVersionSwap() throws IOException {
}
}

@Test(timeOut = TEST_TIMEOUT * 2)
public void testDvcDelayedIngestionWithTargetRegion() throws Exception {
// Setup job properties
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
String keySchemaStr = "\"int\"";
String valueSchemaStr = "\"int\"";

// Create store + start a normal push
int keyCount = 100;
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir, keyCount);
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("testDvcDelayedIngestionWithTargetRegion");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, valueSchemaStr, props, storeParms).close();
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 1);
});
});
}

// Create dvc client in target region
List<VeniceMultiClusterWrapper> childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
VeniceClusterWrapper cluster1 = childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig = DaVinciTestContext.getDaVinciPropertyBuilder(cluster1.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, TARGET_REGION)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client1 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster1, new DaVinciConfig(), backendConfig);
client1.subscribeAll().get();

// Check that v1 is ingested
for (int i = 1; i <= keyCount; i++) {
assertNotNull(client1.get(i).get());
}

// Do another push with target region enabled
int keyCount2 = 200;
File inputDir2 = getTempDataDirectory();
String inputDirPath2 = "file://" + inputDir2.getAbsolutePath();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir2, keyCount2);
Properties props2 =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath2, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
props2.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
props2.put(TARGETED_REGION_PUSH_LIST, TARGET_REGION);
TestWriteUtils.runPushJob("Test push job", props2);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 2),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in the target region
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(TARGET_REGION)) {
Assert.assertEquals((int) version, 2);
} else {
Assert.assertEquals((int) version, 1);
}
});
});

// Data should be automatically ingested in target region for dvc
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client1.get(i).get());
}
});

// Close dvc client in target region
client1.close();

// Create dvc client in non target region
VeniceClusterWrapper cluster2 = childDatacenters.get(1).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig2 = DaVinciTestContext.getDaVinciPropertyBuilder(cluster2.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, "dc-1")
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client2 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster2, new DaVinciConfig(), backendConfig2);
client2.subscribeAll().get();

// Check that v2 is not ingested
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNull(client2.get(i).get());
}
});

// Version should be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 2);
});
});

// Check that v2 is ingested in dvc non target region
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client2.get(i).get());
}
});

client2.close();
}
}
}
Loading
Loading