Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stale host / port bug for failure detector #15116

Merged
merged 1 commit into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ private void sendRequest(long requestId, TableType tableType, BrokerRequest brok

public static class PinotServerStreamingQueryClient {
private final Map<String, ServerGrpcQueryClient> _grpcQueryClientMap = new ConcurrentHashMap<>();
private final Map<String, String> _instanceIdToHostnamePortMap = new ConcurrentHashMap<>();
private final GrpcConfig _config;

public PinotServerStreamingQueryClient(GrpcConfig config) {
Expand All @@ -151,7 +150,6 @@ public Iterator<Server.ServerResponse> submit(ServerInstance serverInstance, Ser

private ServerGrpcQueryClient getOrCreateGrpcQueryClient(ServerInstance serverInstance) {
String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort());
_instanceIdToHostnamePortMap.put(serverInstance.getInstanceId(), hostnamePort);
return _grpcQueryClientMap.computeIfAbsent(hostnamePort,
k -> new ServerGrpcQueryClient(serverInstance.getHostname(), serverInstance.getGrpcPort(), _config));
}
Expand All @@ -174,15 +172,15 @@ private FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
return FailureDetector.ServerState.UNHEALTHY;
}

String hostnamePort = _streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId);
String hostnamePort = String.format("%s_%d", serverInstance.getHostname(), serverInstance.getGrpcPort());
ServerGrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort);

// Could occur if the cluster is only serving multi-stage queries
if (hostnamePort == null) {
if (client == null) {
LOGGER.debug("No GrpcQueryClient found for server with instanceId: {}", instanceId);
return FailureDetector.ServerState.UNKNOWN;
}

ServerGrpcQueryClient client = _streamingQueryClient._grpcQueryClientMap.get(hostnamePort);

ConnectivityState connectivityState = client.getChannel().getState(true);
if (connectivityState == ConnectivityState.READY) {
LOGGER.info("Successfully connected to server: {}", instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,6 @@ public FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
return FailureDetector.ServerState.UNHEALTHY;
}

return _queryDispatcher.checkConnectivityToInstance(instanceId);
return _queryDispatcher.checkConnectivityToInstance(serverInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
Expand Down Expand Up @@ -104,7 +105,6 @@ public class QueryDispatcher {
private final MailboxService _mailboxService;
private final ExecutorService _executorService;
private final Map<String, DispatchClient> _dispatchClientMap = new ConcurrentHashMap<>();
private final Map<String, String> _instanceIdToHostnamePortMap = new ConcurrentHashMap<>();
private final Map<String, TimeSeriesDispatchClient> _timeSeriesDispatchClientMap = new ConcurrentHashMap<>();
@Nullable
private final TlsConfig _tlsConfig;
Expand Down Expand Up @@ -217,22 +217,25 @@ void submit(
}
}

public FailureDetector.ServerState checkConnectivityToInstance(String instanceId) {
String hostnamePort = _instanceIdToHostnamePortMap.get(instanceId);
public FailureDetector.ServerState checkConnectivityToInstance(ServerInstance serverInstance) {
String hostname = serverInstance.getHostname();
int port = serverInstance.getQueryServicePort();
String hostnamePort = String.format("%s_%d", hostname, port);

DispatchClient client = _dispatchClientMap.get(hostnamePort);
// Could occur if the cluster is only serving single-stage queries
if (hostnamePort == null) {
LOGGER.debug("No DispatchClient found for server with instanceId: {}", instanceId);
if (client == null) {
LOGGER.debug("No DispatchClient found for server with instanceId: {}", serverInstance.getInstanceId());
return FailureDetector.ServerState.UNKNOWN;
}

DispatchClient client = _dispatchClientMap.get(hostnamePort);
ConnectivityState connectivityState = client.getChannel().getState(true);
if (connectivityState == ConnectivityState.READY) {
LOGGER.info("Successfully connected to server: {}", instanceId);
LOGGER.info("Successfully connected to server: {}", serverInstance.getInstanceId());
return FailureDetector.ServerState.HEALTHY;
} else {
LOGGER.info("Still can't connect to server: {}, current state: {}", instanceId, connectivityState);
LOGGER.info("Still can't connect to server: {}, current state: {}", serverInstance.getInstanceId(),
connectivityState);
return FailureDetector.ServerState.UNHEALTHY;
}
}
Expand Down Expand Up @@ -444,7 +447,6 @@ private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServer
String hostname = queryServerInstance.getHostname();
int port = queryServerInstance.getQueryServicePort();
String hostnamePort = String.format("%s_%d", hostname, port);
_instanceIdToHostnamePortMap.put(queryServerInstance.getInstanceId(), hostnamePort);
return _dispatchClientMap.computeIfAbsent(hostnamePort, k -> new DispatchClient(hostname, port, _tlsConfig));
}

Expand Down Expand Up @@ -547,7 +549,6 @@ public void shutdown() {
dispatchClient.getChannel().shutdown();
}
_dispatchClientMap.clear();
_instanceIdToHostnamePortMap.clear();
_mailboxService.shutdown();
_executorService.shutdown();
}
Expand Down
Loading