Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Helix to 1.4.3 #15063

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
Expand Down Expand Up @@ -143,24 +141,82 @@ public static List<String> assignSegmentWithReplicaGroup(Map<String, Map<String,
}

/**
* Rebalances the table with Helix AutoRebalanceStrategy.
* Rebalances the table for the non-replica-group based segment assignment strategy by using uniformly spraying
* segment replicas to the instances.
* <ul>
* <li>
* 1. Calculate the target number of segments on each instance
* </li>
* <li>
* 2. Loop over all the segments and keep the assignment if target number of segments for the instance has not
* been reached and track the not assigned segments
* </li>
* <li>
* 3. Assign the left-over segments to the instances with the least segments, or the smallest index if there is a
* tie
* </li>
* </ul>
*/
public static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStrategy(
public static Map<String, Map<String, String>> rebalanceNonReplicaGroupBasedTable(
Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) {
// Use Helix AutoRebalanceStrategy to rebalance the table
LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
states.put(SegmentStateModel.ONLINE, replication);
AutoRebalanceStrategy autoRebalanceStrategy =
new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states);
// Make a copy of the current assignment because this step might change the passed in assignment
Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
Map<String, Integer> instanceNameToIdMap = getInstanceNameToIdMap(instances);

// Calculate target number of segments per instance
// NOTE: in order to minimize the segment movements, use the ceiling of the quotient
int numInstances = instances.size();
int numSegments = currentAssignment.size();
int targetNumSegmentsPerInstance = (numSegments * replication + numInstances - 1) / numInstances;

// Do not move segment if target number of segments is not reached, track the segments need to be moved
Map<String, Map<String, String>> newAssignment = new TreeMap<>();
int[] numSegmentsAssignedPerInstance = new int[numInstances];
List<String> segmentsNotAssigned = new ArrayList<>();
for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
Set<String> currentInstances = entry.getValue().keySet();
int remainingReplicas = replication;
for (String instanceName : currentInstances) {
Integer instanceId = instanceNameToIdMap.get(instanceName);
if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) {
newAssignment.computeIfAbsent(segmentName, k -> new TreeMap<>()).put(instanceName, SegmentStateModel.ONLINE);
numSegmentsAssignedPerInstance[instanceId]++;
remainingReplicas--;
if (remainingReplicas == 0) {
break;
}
}
}
for (int i = 0; i < remainingReplicas; i++) {
segmentsNotAssigned.add(segmentName);
}
}

// Assign each not assigned segment to the instance with the least segments, or the smallest id if there is a tie
PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator());
for (int instanceId = 0; instanceId < numInstances; instanceId++) {
heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId));
}
return autoRebalanceStrategy.computePartitionAssignment(instances, instances, currentAssignmentCopy, null)
.getMapFields();
List<Pairs.IntPair> skippedPairs = new ArrayList<>();
for (String segmentName : segmentsNotAssigned) {
Map<String, String> instanceStateMap = newAssignment.computeIfAbsent(segmentName, k -> new TreeMap<>());
while (true) {
Pairs.IntPair intPair = heap.remove();
int instanceId = intPair.getRight();
String instanceName = instances.get(instanceId);
// Skip the instance if it already has the segment
if (instanceStateMap.put(instanceName, SegmentStateModel.ONLINE) == null) {
intPair.setLeft(intPair.getLeft() + 1);
heap.add(intPair);
break;
} else {
skippedPairs.add(intPair);
}
}
heap.addAll(skippedPairs);
skippedPairs.clear();
}

return newAssignment;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@
public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);

private String _tableNameWithType;
private int _replication;

@Override
public void init(HelixManager helixManager, TableConfig tableConfig) {
_tableNameWithType = tableConfig.getTableName();
String tableNameWithType = tableConfig.getTableName();
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
Preconditions.checkState(validationAndRetentionConfig != null, "Validation Config is null");
_replication = tableConfig.getReplication();
LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: " + "{} with replication: {}",
_tableNameWithType, _replication);
tableNameWithType, _replication);
}

@Override
Expand All @@ -70,7 +69,7 @@ public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String,
List<String> instances =
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication);
newAssignment =
SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication);
SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, _replication);
return newAssignment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
Expand Down Expand Up @@ -98,6 +96,7 @@
import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
import org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionStrategy;
import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
Expand Down Expand Up @@ -1312,16 +1311,10 @@ private void handleSegmentMovement(String realtimeTableName, Map<String, Map<Str
newConsumingSegment, newInstances, realtimeTableName, instancesNoLongerServe);

ClusterMessagingService messagingService = _helixManager.getMessagingService();
IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage();
List<String> instancesSent = new ArrayList<>(instancesNoLongerServe.size());
for (String instance : instancesNoLongerServe) {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName(instance);
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(realtimeTableName);
recipientCriteria.setPartition(committedSegment);
recipientCriteria.setSessionSpecific(true);
IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage();
if (messagingService.send(recipientCriteria, message, null, -1) > 0) {
if (MessagingServiceUtils.send(messagingService, message, realtimeTableName, committedSegment, instance) > 0) {
instancesSent.add(instance);
} else {
LOGGER.warn("Failed to send ingestion metrics remove message for table: {} segment: {} to instance: {}",
Expand Down Expand Up @@ -2251,13 +2244,9 @@ public IdealState updatePauseStateInIdealState(String tableNameWithType, boolean

private void sendForceCommitMessageToServers(String tableNameWithType, Set<String> consumingSegments) {
if (!consumingSegments.isEmpty()) {
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
ClusterMessagingService messagingService = _helixManager.getMessagingService();
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments);
int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
int numMessagesSent = MessagingServiceUtils.send(messagingService, message, tableNameWithType);
if (numMessagesSent > 0) {
LOGGER.info("Sent {} force commit messages for table: {} segments: {}", numMessagesSent, tableNameWithType,
consumingSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import java.util.function.Consumer;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.metrics.ControllerMetrics;
Expand All @@ -45,6 +43,7 @@
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.helix.core.util.MessagingServiceUtils;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -262,7 +261,7 @@ static void triggerLocalTierMigration(String tableNameWithType, TableTierReader.
}
}
}
if (serverToSegmentsToMigrate.size() > 0) {
if (!serverToSegmentsToMigrate.isEmpty()) {
LOGGER.info("Notify servers: {} to move segments to new tiers locally", serverToSegmentsToMigrate.keySet());
reloadSegmentsForLocalTierMigration(tableNameWithType, serverToSegmentsToMigrate, messagingService);
} else {
Expand All @@ -275,17 +274,13 @@ private static void reloadSegmentsForLocalTierMigration(String tableNameWithType
for (Map.Entry<String, Set<String>> entry : serverToSegmentsToMigrate.entrySet()) {
String serverName = entry.getKey();
Set<String> segmentNames = entry.getValue();
LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", serverName,
segmentNames, tableNameWithType);
// One SegmentReloadMessage per server but takes all segment names.
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName(serverName);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage =
new SegmentReloadMessage(tableNameWithType, new ArrayList<>(segmentNames), false);
LOGGER.info("Sending SegmentReloadMessage to server: {} to reload segments: {} of table: {}", serverName,
segmentNames, tableNameWithType);
int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, -1);
int numMessagesSent =
MessagingServiceUtils.send(messagingService, segmentReloadMessage, tableNameWithType, null, serverName);
if (numMessagesSent > 0) {
LOGGER.info("Sent SegmentReloadMessage to server: {} for table: {}", serverName, tableNameWithType);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix.core.util;

import javax.annotation.Nullable;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.Criteria;
import org.apache.helix.InstanceType;
import org.apache.helix.model.Message;


public class MessagingServiceUtils {
private MessagingServiceUtils() {
}

/**
* Sends a message to the recipients specified by the criteria, returns the messages being sent.
*/
public static int send(ClusterMessagingService messagingService, Message message, Criteria criteria) {
try {
return messagingService.send(criteria, message);
} catch (Exception e) {
// NOTE:
// It can throw exception when the target resource doesn't exist (e.g. ExternalView has not been created yet). It
// is normal case, and we count it as no message being sent.
return 0;
}
}

private static int send(ClusterMessagingService messagingService, Message message, String resource,
@Nullable String partition, @Nullable String instanceName, boolean includingSelf) {
Criteria criteria = new Criteria();
criteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
criteria.setSessionSpecific(true);
criteria.setResource(resource);
if (partition != null) {
criteria.setPartition(partition);
}
if (instanceName != null) {
criteria.setInstanceName(instanceName);
}
criteria.setSelfExcluded(!includingSelf);
return send(messagingService, message, criteria);
}

public static int send(ClusterMessagingService messagingService, Message message, String resource,
@Nullable String partition, @Nullable String instanceName) {
return send(messagingService, message, resource, partition, instanceName, false);
}

public static int send(ClusterMessagingService messagingService, Message message, String resource) {
return send(messagingService, message, resource, null, null, false);
}

public static int sendIncludingSelf(ClusterMessagingService messagingService, Message message, String resource) {
return send(messagingService, message, resource, null, null, true);
}
}
Loading
Loading