diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1f983ce6718d..908c03b6f029 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -63,12 +63,10 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; @@ -155,6 +153,7 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.helix.starter.HelixConfig; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.DatabaseConfig; @@ -195,10 +194,8 @@ public class PinotHelixResourceManager { private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 5; public static final String APPEND = "APPEND"; - private static final int DEFAULT_IDEAL_STATE_UPDATER_LOCKERS_SIZE = 500; private static final int DEFAULT_LINEAGE_UPDATER_LOCKERS_SIZE = 500; private static final String API_REQUEST_ID_PREFIX = "api-"; - private static final int INFINITE_TIMEOUT = -1; private enum LineageUpdateType { START, END, REVERT @@ -207,8 +204,6 @@ private enum LineageUpdateType { // TODO: make this configurable public static final long EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS = 10 * 60_000L; // 10 minutes public static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second - public static final long SEGMENT_CLEANUP_TIMEOUT_MS = 20 * 60_000L; // 20 minutes - public static final long SEGMENT_CLEANUP_CHECK_INTERVAL_MS = 1_000L; // 1 second private static final DateTimeFormatter SIMPLE_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneOffset.UTC); @@ -2579,17 +2574,9 @@ private void deleteTableOnServers(String tableNameWithType) { } LOGGER.info("Sending delete table messages for table: {}", tableNameWithType); - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs); + TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType); + int numMessagesSent = MessagingServiceUtils.send(messagingService, tableDeletionMessage, tableNameWithType); if (numMessagesSent > 0) { LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType); } else { @@ -2634,20 +2621,15 @@ public Map> reloadSegments(String tableNameWithTyp Preconditions.checkArgument(tt == TableType.OFFLINE, "Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType); } - // Infinite timeout on the recipient - int timeoutMs = -1; + + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); Map> instanceMsgInfoMap = new HashMap<>(); for (Map.Entry> entry : instanceToSegmentsMap.entrySet()) { String targetInstance = entry.getKey(); - List segments = entry.getValue(); - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segments, forceDownload); - ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage segmentReloadMessage = + new SegmentReloadMessage(tableNameWithType, entry.getValue(), forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages to instance: {} for table: {}", numMessagesSent, targetInstance, tableNameWithType); @@ -2671,17 +2653,10 @@ public Pair reloadAllSegments(String tableNameWithType, boolean "Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType); } - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages for table: {}", numMessagesSent, tableNameWithType); } else { @@ -2704,19 +2679,12 @@ public Pair reloadSegment(String tableNameWithType, String segm segmentName); } - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(targetInstance == null ? "%" : targetInstance); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setPartition(segmentName); - recipientCriteria.setSessionSpecific(true); - SegmentReloadMessage segmentReloadMessage = - new SegmentReloadMessage(tableNameWithType, Collections.singletonList(segmentName), forceDownload); ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - - // Infinite timeout on the recipient - int timeoutMs = -1; - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + SegmentReloadMessage segmentReloadMessage = + new SegmentReloadMessage(tableNameWithType, List.of(segmentName), forceDownload); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, segmentName, + targetInstance); if (numMessagesSent > 0) { LOGGER.info("Sent {} reload messages for segment: {} in table: {}", numMessagesSent, segmentName, tableNameWithType); @@ -2810,8 +2778,8 @@ private static Set parseInstanceSet(IdealState idealState, String segmen * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. */ private void resetPartitionAllState(String instanceName, String resourceName, Set resetPartitionNames) { - LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", - resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, instanceName, _helixClusterName); + LOGGER.info("Resetting partitions: {} for resource: {} on instance: {}", resetPartitionNames, resourceName, + instanceName); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -2847,7 +2815,7 @@ private void resetPartitionAllState(String instanceName, String resourceName, Se resourceName, resetPartitionNames, instanceName, message, message.getResourceName())); } - String adminName = null; + String adminName; try { adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; } catch (UnknownHostException e) { @@ -2899,21 +2867,13 @@ private void resetPartitionAllState(String instanceName, String resourceName, Se */ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentName, boolean refreshServerSegment, boolean refreshBrokerRouting) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); SegmentRefreshMessage segmentRefreshMessage = new SegmentRefreshMessage(tableNameWithType, segmentName); // Send segment refresh message to servers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setSessionSpecific(true); - ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - if (refreshServerSegment) { - // Send segment refresh message to servers - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setPartition(segmentName); - // Send message with no callback and infinite timeout on the recipient - int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentRefreshMessage, tableNameWithType, segmentName, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} segment refresh messages to servers for segment: {} of table: {}", numMessagesSent, @@ -2924,11 +2884,11 @@ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentNa } } + // Send segment refresh message to brokers if (refreshBrokerRouting) { - // Send segment refresh message to brokers - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setPartition(tableNameWithType); - int numMessagesSent = messagingService.send(recipientCriteria, segmentRefreshMessage, null, -1); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE, + tableNameWithType, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} segment refresh messages to brokers for segment: {} of table: {}", numMessagesSent, @@ -2941,18 +2901,11 @@ public void sendSegmentRefreshMessage(String tableNameWithType, String segmentNa } private void sendTableConfigRefreshMessage(String tableNameWithType) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType); - - // Send table config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setPartition(tableNameWithType); - // Send message with no callback and infinite timeout on the recipient int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, tableConfigRefreshMessage, null, -1); + MessagingServiceUtils.send(messagingService, tableConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE, + tableNameWithType, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} table config refresh messages to brokers for table: {}", numMessagesSent, tableNameWithType); @@ -2962,18 +2915,12 @@ private void sendTableConfigRefreshMessage(String tableNameWithType) { } private void sendApplicationQpsQuotaRefreshMessage(String appName) { - ApplicationQpsQuotaRefreshMessage message = new ApplicationQpsQuotaRefreshMessage(appName); - - // Send database config refresh message to brokers - Criteria criteria = new Criteria(); - criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - criteria.setInstanceName("%"); - criteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - criteria.setSessionSpecific(true); - - int numMessagesSent = _helixZkManager.getMessagingService().send(criteria, message, null, INFINITE_TIMEOUT); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + ApplicationQpsQuotaRefreshMessage quotaRefreshMessage = new ApplicationQpsQuotaRefreshMessage(appName); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, quotaRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE); if (numMessagesSent > 0) { - LOGGER.info("Sent {} applcation qps quota refresh messages to brokers for application: {}", numMessagesSent, + LOGGER.info("Sent {} application qps quota refresh messages to brokers for application: {}", numMessagesSent, appName); } else { LOGGER.warn("No application qps quota refresh message sent to brokers for application: {}", appName); @@ -2981,17 +2928,10 @@ private void sendApplicationQpsQuotaRefreshMessage(String appName) { } private void sendDatabaseConfigRefreshMessage(String databaseName) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); DatabaseConfigRefreshMessage databaseConfigRefreshMessage = new DatabaseConfigRefreshMessage(databaseName); - - // Send database config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - // Send message with no callback and infinite timeout on the recipient int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, databaseConfigRefreshMessage, null, -1); + MessagingServiceUtils.send(messagingService, databaseConfigRefreshMessage, Helix.BROKER_RESOURCE_INSTANCE); if (numMessagesSent > 0) { LOGGER.info("Sent {} database config refresh messages to brokers for database: {}", numMessagesSent, databaseName); @@ -3001,18 +2941,11 @@ private void sendDatabaseConfigRefreshMessage(String databaseName) { } private void sendRoutingTableRebuildMessage(String tableNameWithType) { + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType); - - // Send table config refresh message to brokers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setPartition(tableNameWithType); - // Send message with no callback and infinite timeout on the recipient int numMessagesSent = - _helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1); + MessagingServiceUtils.send(messagingService, routingTableRebuildMessage, Helix.BROKER_RESOURCE_INSTANCE, + tableNameWithType, null); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, @@ -4515,27 +4448,17 @@ public int getNumReplicas(TableConfig tableConfig) { public PeriodicTaskInvocationResponse invokeControllerPeriodicTask(String tableName, String periodicTaskName, Map taskProperties) { String periodicTaskRequestId = API_REQUEST_ID_PREFIX + UUID.randomUUID().toString().substring(0, 8); - LOGGER.info("[TaskRequestId: {}] Sending periodic task message to all controllers for running task {} against {}," + " with properties {}.\"", periodicTaskRequestId, periodicTaskName, tableName != null ? " table '" + tableName + "'" : "all tables", taskProperties); - - // Create and send message to send to all controllers (including this one) - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setSessionSpecific(true); - recipientCriteria.setResource(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); - recipientCriteria.setSelfExcluded(false); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); RunPeriodicTaskMessage runPeriodicTaskMessage = new RunPeriodicTaskMessage(periodicTaskRequestId, periodicTaskName, tableName, taskProperties); - - ClusterMessagingService clusterMessagingService = getHelixZkManager().getMessagingService(); - int messageCount = clusterMessagingService.send(recipientCriteria, runPeriodicTaskMessage, null, -1); - + int numMessagesSent = MessagingServiceUtils.sendIncludingSelf(messagingService, runPeriodicTaskMessage, + Helix.LEAD_CONTROLLER_RESOURCE_NAME); LOGGER.info("[TaskRequestId: {}] Periodic task execution message sent to {} controllers.", periodicTaskRequestId, - messageCount); - return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0); + numMessagesSent); + return new PeriodicTaskInvocationResponse(periodicTaskRequestId, numMessagesSent > 0); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 727bdbdfda51..9c7e37e86e74 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -33,7 +32,6 @@ import java.util.TreeMap; import javax.annotation.Nullable; import org.apache.helix.HelixManager; -import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -143,24 +141,82 @@ public static List assignSegmentWithReplicaGroup(Map + *
  • + * 1. Calculate the target number of segments on each instance + *
  • + *
  • + * 2. Loop over all the segments and keep the assignment if target number of segments for the instance has not + * been reached and track the not assigned segments + *
  • + *
  • + * 3. Assign the left-over segments to the instances with the least segments, or the smallest index if there is a + * tie + *
  • + * */ - public static Map> rebalanceTableWithHelixAutoRebalanceStrategy( + public static Map> rebalanceNonReplicaGroupBasedTable( Map> currentAssignment, List instances, int replication) { - // Use Helix AutoRebalanceStrategy to rebalance the table - LinkedHashMap states = new LinkedHashMap<>(); - states.put(SegmentStateModel.ONLINE, replication); - AutoRebalanceStrategy autoRebalanceStrategy = - new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states); - // Make a copy of the current assignment because this step might change the passed in assignment - Map> currentAssignmentCopy = new TreeMap<>(); + Map instanceNameToIdMap = getInstanceNameToIdMap(instances); + + // Calculate target number of segments per instance + // NOTE: in order to minimize the segment movements, use the ceiling of the quotient + int numInstances = instances.size(); + int numSegments = currentAssignment.size(); + int targetNumSegmentsPerInstance = (numSegments * replication + numInstances - 1) / numInstances; + + // Do not move segment if target number of segments is not reached, track the segments need to be moved + Map> newAssignment = new TreeMap<>(); + int[] numSegmentsAssignedPerInstance = new int[numInstances]; + List segmentsNotAssigned = new ArrayList<>(); for (Map.Entry> entry : currentAssignment.entrySet()) { String segmentName = entry.getKey(); - Map instanceStateMap = entry.getValue(); - currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap)); + Set currentInstances = entry.getValue().keySet(); + int remainingReplicas = replication; + for (String instanceName : currentInstances) { + Integer instanceId = instanceNameToIdMap.get(instanceName); + if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) { + newAssignment.computeIfAbsent(segmentName, k -> new TreeMap<>()).put(instanceName, SegmentStateModel.ONLINE); + numSegmentsAssignedPerInstance[instanceId]++; + remainingReplicas--; + if (remainingReplicas == 0) { + break; + } + } + } + for (int i = 0; i < remainingReplicas; i++) { + segmentsNotAssigned.add(segmentName); + } + } + + // Assign each not assigned segment to the instance with the least segments, or the smallest id if there is a tie + PriorityQueue heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator()); + for (int instanceId = 0; instanceId < numInstances; instanceId++) { + heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId)); } - return autoRebalanceStrategy.computePartitionAssignment(instances, instances, currentAssignmentCopy, null) - .getMapFields(); + List skippedPairs = new ArrayList<>(); + for (String segmentName : segmentsNotAssigned) { + Map instanceStateMap = newAssignment.computeIfAbsent(segmentName, k -> new TreeMap<>()); + while (true) { + Pairs.IntPair intPair = heap.remove(); + int instanceId = intPair.getRight(); + String instanceName = instances.get(instanceId); + // Skip the instance if it already has the segment + if (instanceStateMap.put(instanceName, SegmentStateModel.ONLINE) == null) { + intPair.setLeft(intPair.getLeft() + 1); + heap.add(intPair); + break; + } else { + skippedPairs.add(intPair); + } + } + heap.addAll(skippedPairs); + skippedPairs.clear(); + } + + return newAssignment; } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java index e9c540da78a2..2a714e61fc69 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java @@ -42,17 +42,16 @@ public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class); - private String _tableNameWithType; private int _replication; @Override public void init(HelixManager helixManager, TableConfig tableConfig) { - _tableNameWithType = tableConfig.getTableName(); + String tableNameWithType = tableConfig.getTableName(); SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig(); Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null"); _replication = tableConfig.getReplication(); LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}", - _tableNameWithType, _replication); + tableNameWithType, _replication); } @Override @@ -70,7 +69,7 @@ public Map> reassignSegments(Map instances = SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication); newAssignment = - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, _replication); return newAssignment; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 7121dfe1dc9c..95cc87675433 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -54,10 +54,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; -import org.apache.helix.InstanceType; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -98,6 +96,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy; import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils; import org.apache.pinot.core.util.PeerServerSegmentFinder; @@ -1312,16 +1311,10 @@ private void handleSegmentMovement(String realtimeTableName, Map instancesSent = new ArrayList<>(instancesNoLongerServe.size()); for (String instance : instancesNoLongerServe) { - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName(instance); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource(realtimeTableName); - recipientCriteria.setPartition(committedSegment); - recipientCriteria.setSessionSpecific(true); - IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); - if (messagingService.send(recipientCriteria, message, null, -1) > 0) { + if (MessagingServiceUtils.send(messagingService, message, realtimeTableName, committedSegment, instance) > 0) { instancesSent.add(instance); } else { LOGGER.warn("Failed to send ingestion metrics remove message for table: {} segment: {} to instance: {}", @@ -2251,13 +2244,9 @@ public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean private void sendForceCommitMessageToServers(String tableNameWithType, Set consumingSegments) { if (!consumingSegments.isEmpty()) { - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setInstanceName("%"); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); + ClusterMessagingService messagingService = _helixManager.getMessagingService(); ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments); - int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1); + int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType); if (numMessagesSent > 0) { LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType, consumingSegments); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index 13b4bae7f15f..c6fe9bf26fec 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -32,8 +32,6 @@ import java.util.function.Consumer; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.helix.ClusterMessagingService; -import org.apache.helix.Criteria; -import org.apache.helix.InstanceType; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -45,6 +43,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; +import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils; import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -262,7 +261,7 @@ static void triggerLocalTierMigration(String tableNameWithType, TableTierReader. } } } - if (serverToSegmentsToMigrate.size() > 0) { + if (!serverToSegmentsToMigrate.isEmpty()) { LOGGER.info("Notify servers: {} to move segments to new tiers locally", serverToSegmentsToMigrate.keySet()); reloadSegmentsForLocalTierMigration(tableNameWithType, serverToSegmentsToMigrate, messagingService); } else { @@ -275,17 +274,13 @@ private static void reloadSegmentsForLocalTierMigration(String tableNameWithType for (Map.Entry> entry : serverToSegmentsToMigrate.entrySet()) { String serverName = entry.getKey(); Set segmentNames = entry.getValue(); + LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", serverName, + segmentNames, tableNameWithType); // One SegmentReloadMessage per server but takes all segment names. - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(serverName); - recipientCriteria.setResource(tableNameWithType); - recipientCriteria.setSessionSpecific(true); SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, new ArrayList<>(segmentNames), false); - LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", serverName, - segmentNames, tableNameWithType); - int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, -1); + int numMessagesSent = + MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, serverName); if (numMessagesSent > 0) { LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", serverName, tableNameWithType); } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java new file mode 100644 index 000000000000..f977240f96be --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/MessagingServiceUtils.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.util; + +import javax.annotation.Nullable; +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.Criteria; +import org.apache.helix.InstanceType; +import org.apache.helix.model.Message; + + +public class MessagingServiceUtils { + private MessagingServiceUtils() { + } + + /** + * Sends a message to the recipients specified by the criteria, returns the messages being sent. + */ + public static int send(ClusterMessagingService messagingService, Message message, Criteria criteria) { + try { + return messagingService.send(criteria, message); + } catch (Exception e) { + // NOTE: + // It can throw exception when the target resource doesn't exist (e.g. ExternalView has not been created yet). It + // is normal case, and we count it as no message being sent. + return 0; + } + } + + private static int send(ClusterMessagingService messagingService, Message message, String resource, + @Nullable String partition, @Nullable String instanceName, boolean includingSelf) { + Criteria criteria = new Criteria(); + criteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + criteria.setSessionSpecific(true); + criteria.setResource(resource); + if (partition != null) { + criteria.setPartition(partition); + } + if (instanceName != null) { + criteria.setInstanceName(instanceName); + } + criteria.setSelfExcluded(!includingSelf); + return send(messagingService, message, criteria); + } + + public static int send(ClusterMessagingService messagingService, Message message, String resource, + @Nullable String partition, @Nullable String instanceName) { + return send(messagingService, message, resource, partition, instanceName, false); + } + + public static int send(ClusterMessagingService messagingService, Message message, String resource) { + return send(messagingService, message, resource, null, null, false); + } + + public static int sendIncludingSelf(ClusterMessagingService messagingService, Message message, String resource) { + return send(messagingService, message, resource, null, null, true); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java index 0f43b7869dfa..501db9e572b0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java @@ -76,8 +76,7 @@ public void testRebalanceTableWithHelixAutoRebalanceStrategy() { Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); // Current assignment should already be balanced - assertEquals( - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, NUM_REPLICAS), + assertEquals(SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, NUM_REPLICAS), currentAssignment); // Replace instance_0 with instance_10 @@ -85,8 +84,7 @@ public void testRebalanceTableWithHelixAutoRebalanceStrategy() { String newInstanceName = INSTANCE_NAME_PREFIX + 10; newInstances.set(0, newInstanceName); Map> newAssignment = - SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(currentAssignment.size(), numSegments); // Each segment should have 3 replicas @@ -116,30 +114,29 @@ public void testRebalanceTableWithHelixAutoRebalanceStrategy() { // } int newNumInstances = numInstances - 5; newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas for (Map instanceStateMap : newAssignment.values()) { assertEquals(instanceStateMap.size(), NUM_REPLICAS); } - // The segments are not perfectly balanced, but should be deterministic numSegmentsAssignedPerInstance = SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); - assertEquals(numSegmentsAssignedPerInstance[0], 56); + assertEquals(numSegmentsAssignedPerInstance[0], 60); assertEquals(numSegmentsAssignedPerInstance[1], 60); assertEquals(numSegmentsAssignedPerInstance[2], 60); assertEquals(numSegmentsAssignedPerInstance[3], 60); - assertEquals(numSegmentsAssignedPerInstance[4], 64); + assertEquals(numSegmentsAssignedPerInstance[4], 60); numSegmentsToMovePerInstance = SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, newAssignment); assertEquals(numSegmentsToMovePerInstance.size(), numInstances); - assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), IntIntPair.of(26, 0)); + assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 0), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 1), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 2), IntIntPair.of(30, 0)); assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 3), IntIntPair.of(30, 0)); - assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), IntIntPair.of(34, 0)); + assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + 4), IntIntPair.of(30, 0)); for (int i = 5; i < 10; i++) { assertEquals(numSegmentsToMovePerInstance.get(INSTANCE_NAME_PREFIX + i), IntIntPair.of(0, 30)); } @@ -150,8 +147,8 @@ public void testRebalanceTableWithHelixAutoRebalanceStrategy() { // } newNumInstances = numInstances + 5; newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas @@ -182,8 +179,8 @@ public void testRebalanceTableWithHelixAutoRebalanceStrategy() { // } String newInstanceNamePrefix = "i_"; newInstances = SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances); - newAssignment = SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, - NUM_REPLICAS); + newAssignment = + SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, newInstances, NUM_REPLICAS); // There should be 100 segments assigned assertEquals(newAssignment.size(), numSegments); // Each segment should have 3 replicas diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java index 3c08b79d92b5..55029806a3ad 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java @@ -43,7 +43,6 @@ import org.mockito.ArgumentCaptor; import org.testng.annotations.Test; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -93,8 +92,7 @@ public void testTriggerLocalTierMigration() { ArgumentCaptor criteriaCapture = ArgumentCaptor.forClass(Criteria.class); ArgumentCaptor reloadMessageCapture = ArgumentCaptor.forClass(SegmentReloadMessage.class); - verify(messagingService, times(2)).send(criteriaCapture.capture(), reloadMessageCapture.capture(), eq(null), - eq(-1)); + verify(messagingService, times(2)).send(criteriaCapture.capture(), reloadMessageCapture.capture()); List criteriaList = criteriaCapture.getAllValues(); List msgList = reloadMessageCapture.getAllValues(); diff --git a/pom.xml b/pom.xml index 98eb651a005b..f2445a52230d 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 1.15.0 1.9.5 2.8.1 - 1.3.1 + 1.4.3 0.11 2.18.2 3.9.3