Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18645: New consumer should align close timeout handling with classic consumer #18702

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,13 @@ public void close() {
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
* <p>
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
* only applies to operations performed with the broker (coordinator-related requests and
* fetch sessions). Even if a larger timeout is specified, the consumer will not wait longer than
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and
* {@link ConsumerRebalanceListener}) does not consume time from the close timeout.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
private final int requestTimeoutMs;
private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
Expand Down Expand Up @@ -324,6 +325,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.metrics = createMetrics(config, time, reporters);
this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);

List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
Expand Down Expand Up @@ -447,6 +449,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
int requestTimeoutMs,
int defaultApiTimeoutMs,
String groupId,
boolean autoCommitEnabled) {
Expand All @@ -466,6 +469,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.groupMetadata.set(initializeGroupMetadata(groupId, Optional.empty()));
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
Expand Down Expand Up @@ -499,6 +503,7 @@ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.metadata = metadata;
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
this.clientTelemetryReporter = Optional.empty();
Expand Down Expand Up @@ -1326,7 +1331,7 @@ private void close(Duration timeout, boolean swallowException) {
// We are already closing with a timeout, don't allow wake-ups from here on.
wakeupTrigger.disableWakeups();

final Timer closeTimer = time.timer(timeout);
final Timer closeTimer = createTimerForCloseRequests(timeout);
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Prepare shutting down the network thread
Expand All @@ -1337,7 +1342,7 @@ private void close(Duration timeout, boolean swallowException) {
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
this::stopFindCoordinatorOnClose, firstException);
swallow(log, Level.ERROR, "Failed to release group assignment",
() -> runRebalanceCallbacksOnClose(closeTimer), firstException);
this::runRebalanceCallbacksOnClose, firstException);
swallow(log, Level.ERROR, "Failed to leave group while closing consumer",
() -> leaveGroupOnClose(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callbacks while closing consumer",
Expand Down Expand Up @@ -1368,6 +1373,12 @@ private void close(Duration timeout, boolean swallowException) {
}
}

private Timer createTimerForCloseRequests(Duration timeout) {
// this.time could be null if an exception occurs in constructor prior to setting the this.time field
final Time time = (this.time == null) ? Time.SYSTEM : this.time;
return time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
}

private void autoCommitOnClose(final Timer timer) {
if (groupMetadata.get().isEmpty())
return;
Expand All @@ -1378,7 +1389,7 @@ private void autoCommitOnClose(final Timer timer) {
applicationEventHandler.add(new CommitOnCloseEvent());
}

private void runRebalanceCallbacksOnClose(final Timer timer) {
private void runRebalanceCallbacksOnClose() {
if (groupMetadata.get().isEmpty())
return;

Expand All @@ -1393,19 +1404,15 @@ private void runRebalanceCallbacksOnClose(final Timer timer) {
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(assignedPartitions);

try {
final Exception error;
final Exception error;

if (memberEpoch > 0)
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
else
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
if (memberEpoch > 0)
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
else
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);

if (error != null)
throw ConsumerUtils.maybeWrapAsKafkaException(error);
} finally {
timer.update();
}
if (error != null)
throw ConsumerUtils.maybeWrapAsKafkaException(error);
}

private void leaveGroupOnClose(final Timer timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(
String clientId,
boolean autoCommitEnabled) {
long retryBackoffMs = 100L;
int requestTimeoutMs = 30000;
int defaultApiTimeoutMs = 1000;
return new AsyncKafkaConsumer<>(
new LogContext(),
Expand All @@ -262,6 +263,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(
subscriptions,
metadata,
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
groupId,
autoCommitEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testClose(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()
Expand Down