Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci] Add delayed ingestion in dvc for target region push #1510

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ConcurrentRef;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.RegionUtils;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -240,9 +243,24 @@ synchronized void trySubscribeDaVinciFutureVersion() {
} else {
return;
}
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));

Store store = backend.getStoreRepository().getStoreOrThrow(storeName);
Set<String> targetRegions = RegionUtils.parseRegionsFilterList(store.getTargetSwapRegion());
String currentRegion = backend.getConfigLoader().getVeniceServerConfig().getRegionName();
boolean isTargetRegionEnabled = !StringUtils.isEmpty(store.getTargetSwapRegion());
boolean startIngestionInNonTargetRegion =
isTargetRegionEnabled && targetVersion.getStatus() == VersionStatus.ONLINE;

// Subscribe to the future version if:
// 1. Target region push with delayed ingestion is not enabled
// 2. Target region push with delayed ingestion is enabled and the current region is a target region
// 3. Target region push with delayed ingestion is enabled and the current region is a non target region
// and the wait time has elapsed. The wait time has elapsed when the version status is marked ONLINE
if (targetRegions.contains(currentRegion) || startIngestionInNonTargetRegion || !isTargetRegionEnabled) {
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
daVinciFutureVersion.subscribe(subscription).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.LOCAL_REGION_NAME;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.DaVinciTestContext;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiRegionClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.StoreInfo;
Expand All @@ -19,12 +29,16 @@
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -40,6 +54,9 @@ public class TestDeferredVersionSwap {
private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

private static final int TEST_TIMEOUT = 120_000;
private static final Logger LOGGER = LogManager.getLogger(TestDeferredVersionSwap.class);

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
Expand Down Expand Up @@ -68,7 +85,7 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
@Test(timeOut = TEST_TIMEOUT)
public void testDeferredVersionSwap() throws IOException {
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
Expand Down Expand Up @@ -131,4 +148,140 @@ public void testDeferredVersionSwap() throws IOException {
}
}

@Test(timeOut = TEST_TIMEOUT * 2)
public void testDvcDelayedIngestionWithTargetRegion() throws Exception {
// Setup job properties
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
storeParms.setTargetRegionSwap(TARGET_REGION);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();
String keySchemaStr = "\"int\"";
String valueSchemaStr = "\"int\"";

// Create store + start a normal push
int keyCount = 100;
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir, keyCount);
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("testDvcDelayedIngestionWithTargetRegion");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, valueSchemaStr, props, storeParms).close();
LOGGER.info("DvcDeferredVersionSwap starting normal push job");
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 1);
});
});
}

// Create dvc client in target region
List<VeniceMultiClusterWrapper> childDatacenters = multiRegionMultiClusterWrapper.getChildRegions();
VeniceClusterWrapper cluster1 = childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig = DaVinciTestContext.getDaVinciPropertyBuilder(cluster1.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, TARGET_REGION)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client1 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster1, new DaVinciConfig(), backendConfig);
client1.subscribeAll().get();

// Check that v1 is ingested
for (int i = 1; i <= keyCount; i++) {
assertNotNull(client1.get(i).get());
}

// Do another push with target region enabled
int keyCount2 = 200;
File inputDir2 = getTempDataDirectory();
String inputDirPath2 = "file://" + inputDir2.getAbsolutePath();
TestWriteUtils.writeSimpleAvroFileWithIntToIntSchema(inputDir2, keyCount2);
Properties props2 =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath2, storeName);
try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
props2.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
LOGGER.info("DvcDeferredVersionSwap starting target region push job");
TestWriteUtils.runPushJob("Test push job", props2);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 2),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in the target region
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(TARGET_REGION)) {
Assert.assertEquals((int) version, 2);
} else {
Assert.assertEquals((int) version, 1);
}
});
});

// Data should be automatically ingested in target region for dvc
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client1.get(i).get());
}
});

// Close dvc client in target region
client1.close();

// Create dvc client in non target region
VeniceClusterWrapper cluster2 = childDatacenters.get(1).getClusters().get(CLUSTER_NAMES[0]);
VeniceProperties backendConfig2 = DaVinciTestContext.getDaVinciPropertyBuilder(cluster2.getZk().getAddress())
.put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(LOCAL_REGION_NAME, "dc-1")
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();
DaVinciClient<Object, Object> client2 =
ServiceFactory.getGenericAvroDaVinciClient(storeName, cluster2, new DaVinciConfig(), backendConfig2);
client2.subscribeAll().get();

// Check that v2 is not ingested
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNull(client2.get(i).get());
}
});

// Version should be swapped in all regions
LOGGER.info("DvcDeferredVersionSwap check that target region push is complete");
TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 2);
});
});

// Check that v2 is ingested in dvc non target region
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
for (int i = 101; i <= keyCount2; i++) {
assertNotNull(client2.get(i).get());
}
});

client2.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.ReadWriteStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
Expand All @@ -21,7 +18,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -79,17 +75,6 @@ private Set<String> getRegionsForVersionSwap(Map<String, Integer> candidateRegio
return remainingRegions;
}

private String getTargetRegion(Set<String> targetRegions) {
return targetRegions.iterator().next();
}

private StoreResponse getStoreForRegion(String clusterName, String targetRegion, String storeName) {
Map<String, ControllerClient> controllerClientMap =
veniceParentHelixAdmin.getVeniceHelixAdmin().getControllerClientMap(clusterName);
ControllerClient targetRegionControllerClient = controllerClientMap.get(targetRegion);
return targetRegionControllerClient.getStore(storeName);
}

private boolean didWaitTimeElapseInTargetRegions(
Map<String, Long> completionTimes,
Set<String> targetRegions,
Expand Down Expand Up @@ -146,7 +131,7 @@ public void run() {
storePushCompletionTimes,
targetRegions,
store.getTargetSwapRegionWaitTime())) {
LOGGER.info(
LOGGER.debug(
"Skipping version swap for store: {} on version: {} as wait time: {} has not passed",
storeName,
targetVersionNum,
Expand All @@ -159,34 +144,6 @@ public void run() {
veniceParentHelixAdmin.getCurrentVersionsForMultiColos(cluster, storeName);
Set<String> remainingRegions = getRegionsForVersionSwap(coloToVersions, targetRegions);

StoreResponse targetRegionStoreResponse =
getStoreForRegion(cluster, getTargetRegion(targetRegions), storeName);
if (targetRegionStoreResponse.isError()) {
LOGGER.warn("Got error when fetching targetRegionStore: {}", targetRegionStoreResponse.getError());
continue;
}

StoreInfo targetRegionStore = targetRegionStoreResponse.getStore();
Optional<Version> version = targetRegionStore.getVersion(targetVersionNum);
if (!version.isPresent()) {
LOGGER.warn(
"Unable to find version {} for store: {} in regions: {}",
targetVersionNum,
storeName,
store.getTargetSwapRegion());
continue;
}

// Do not perform version swap for davinci stores
// TODO remove this check once DVC delayed ingestion is completed
if (version.get().getIsDavinciHeartbeatReported()) {
LOGGER.info(
"Skipping version swap for store: {} on version: {} as it is davinci",
storeName,
targetVersionNum);
continue;
}

// Check that push is completed in target regions
Admin.OfflinePushStatusInfo pushStatusInfo =
veniceParentHelixAdmin.getOffLinePushStatus(cluster, kafkaTopicName);
Expand Down Expand Up @@ -269,7 +226,7 @@ public void run() {
store.getTargetSwapRegionWaitTime());

if (!didWaitTimeElapseInTargetRegions) {
LOGGER.info(
LOGGER.debug(
"Skipping version swap for store: {} on version: {} as wait time: {} has not passed",
storeName,
targetVersionNum,
Expand Down
Loading
Loading