Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the segment operation throttle defaults to Integer.MAX_VALUE #15126

Merged
merged 3 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,14 @@ public void testServingQueriesDisabledWithAcquireRelease()
? Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)
: Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES);
// Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too
// long to finish
int numPermitsToTake = 10000;
// We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally
// preprocessing more segments is acceptable and cannot affect the query performance
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery);
for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.acquire();
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1);
Expand All @@ -322,12 +325,11 @@ public void testServingQueriesDisabledWithAcquireRelease()
operationsThrottler.startServingQueries();
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(),
initialPermits - defaultPermitsBeforeQuery);
initialPermits - numPermitsToTake);

for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.release();
Assert.assertEquals(operationsThrottler.availablePermits(),
(initialPermits - defaultPermitsBeforeQuery) + i + 1);
Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1);
}
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits);
Expand All @@ -340,11 +342,11 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease()
int initialPermits = 4;
List<BaseSegmentOperationsThrottler> segmentOperationsThrottlerList = new ArrayList<>();
segmentOperationsThrottlerList.add(new SegmentAllIndexPreprocessThrottler(initialPermits, Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false));
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false));
segmentOperationsThrottlerList.add(new SegmentStarTreePreprocessThrottler(initialPermits, Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES), false));
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false));
segmentOperationsThrottlerList.add(new SegmentDownloadThrottler(initialPermits, Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES), false));
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES) - 5, false));

for (BaseSegmentOperationsThrottler operationsThrottler : segmentOperationsThrottlerList) {
int defaultPermitsBeforeQuery = operationsThrottler instanceof SegmentAllIndexPreprocessThrottler
Expand All @@ -353,14 +355,17 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease()
? Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)
: Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES);
// Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too
// long to finish
int numPermitsToTake = 10000;
// We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally
// preprocessing more segments is acceptable and cannot affect the query performance
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery);
for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - 5);
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.acquire();
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1);
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery - 5);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1 - 5);
}

// Double the permits for before serving queries config
Expand All @@ -370,29 +375,29 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigIncrease()
: operationsThrottler instanceof SegmentStarTreePreprocessThrottler
? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES
: CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
String.valueOf(defaultPermitsBeforeQuery * 2));
String.valueOf(defaultPermitsBeforeQuery));
operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs);
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2);
// We doubled permits but took all of the previous ones
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
// We increased permits but took some before the increase
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - numPermitsToTake);

// Take remaining permits
for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
// Take more permits
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.acquire();
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery * 2);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1);
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(),
defaultPermitsBeforeQuery - numPermitsToTake - i - 1);
}

// Once the server is ready to server queries, we should reset the throttling configurations to be as configured
operationsThrottler.startServingQueries();
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(),
initialPermits - (defaultPermitsBeforeQuery * 2));
initialPermits - (numPermitsToTake * 2));

for (int i = 0; i < defaultPermitsBeforeQuery * 2; i++) {
for (int i = 0; i < numPermitsToTake * 2; i++) {
operationsThrottler.release();
Assert.assertEquals(operationsThrottler.availablePermits(),
(initialPermits - defaultPermitsBeforeQuery * 2) + i + 1);
Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake * 2) + i + 1);
}
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits);
Expand All @@ -418,39 +423,41 @@ public void testServingQueriesDisabledWithAcquireReleaseWithConfigDecrease()
? Integer.parseInt(
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES)
: Integer.parseInt(CommonConstants.Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES);
// Default is too high: Integer.MAX_VALUE, take a limited number of permits so that the test doesn't take too
// long to finish
int numPermitsToTake = 10000;
// We set isServingQueries to false when the server is not yet ready to server queries. In this scenario ideally
// preprocessing more segments is acceptable and cannot affect the query performance
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery);
for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.acquire();
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), defaultPermitsBeforeQuery - i - 1);
}

// Half the permits for before serving queries config
Map<String, String> updatedClusterConfigs = new HashMap<>();
int newDefaultPermits = defaultPermitsBeforeQuery / 2;
updatedClusterConfigs.put(operationsThrottler instanceof SegmentAllIndexPreprocessThrottler
? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES
: operationsThrottler instanceof SegmentStarTreePreprocessThrottler
? CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES
: CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
String.valueOf(defaultPermitsBeforeQuery / 2));
String.valueOf(newDefaultPermits));
operationsThrottler.onChange(updatedClusterConfigs.keySet(), updatedClusterConfigs);
Assert.assertEquals(operationsThrottler.totalPermits(), defaultPermitsBeforeQuery / 2);
Assert.assertEquals(operationsThrottler.totalPermits(), newDefaultPermits);
// We doubled permits but took all of the previous ones
Assert.assertEquals(operationsThrottler.availablePermits(), -(defaultPermitsBeforeQuery / 2));
Assert.assertEquals(operationsThrottler.availablePermits(), newDefaultPermits - numPermitsToTake);

// Once the server is ready to server queries, we should reset the throttling configurations to be as configured
operationsThrottler.startServingQueries();
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(),
initialPermits - defaultPermitsBeforeQuery);
Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits - numPermitsToTake);

for (int i = 0; i < defaultPermitsBeforeQuery; i++) {
for (int i = 0; i < numPermitsToTake; i++) {
operationsThrottler.release();
Assert.assertEquals(operationsThrottler.availablePermits(),
(initialPermits - defaultPermitsBeforeQuery) + i + 1);
Assert.assertEquals(operationsThrottler.availablePermits(), (initialPermits - numPermitsToTake) + i + 1);
}
Assert.assertEquals(operationsThrottler.totalPermits(), initialPermits);
Assert.assertEquals(operationsThrottler.availablePermits(), initialPermits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,33 @@ public static class Instance {
// Preprocess throttle configs
public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM =
"pinot.server.max.segment.preprocess.parallelism";
public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(100);
// Setting to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE);
// Before serving queries is enabled, we should use a higher preprocess parallelism to process segments faster
public static final String CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
"pinot.server.max.segment.preprocess.parallelism.before.serving.queries";
// Use the below default before enabling queries on the server
public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES = String.valueOf(100);
// Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
String.valueOf(Integer.MAX_VALUE);
// Preprocess throttle config specifically for StarTree index rebuild
public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM =
"pinot.server.max.segment.startree.preprocess.parallelism";
public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(100);
// Setting to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM = String.valueOf(Integer.MAX_VALUE);
public static final String CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
"pinot.server.max.segment.startree.preprocess.parallelism.before.serving.queries";
// Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
String.valueOf(100);
String.valueOf(Integer.MAX_VALUE);
public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM =
"pinot.server.max.segment.download.parallelism";
public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = "100";
// Setting to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM = String.valueOf(Integer.MAX_VALUE);
public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES =
"pinot.server.max.segment.download.parallelism.before.serving.queries";
public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES = "100";
// Setting the before serving queries to Integer.MAX_VALUE to effectively disable throttling by default
public static final String DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES =
String.valueOf(Integer.MAX_VALUE);
}

public static class Broker {
Expand Down
Loading