From 6f51b16a7a0f3d8b0f7ede7f614f4acf6ec98d80 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Wed, 29 Jan 2025 15:16:58 +0800 Subject: [PATCH] KAFKA-18641: AsyncKafkaConsumer could lose records with auto offset commit JIRA: KAFKA-18641 Please refer to jira ticket for further details. In short, application thread advances positions but allConsumed is called by background thread. This will lead to inconsistent between committed offset and actual consumed records --- .../internals/CommitRequestManager.java | 29 ++++++---- .../events/ApplicationEventProcessor.java | 21 ++++++- .../internals/CommitRequestManagerTest.java | 56 +++++++++++-------- .../events/ApplicationEventProcessorTest.java | 6 +- 4 files changed, 72 insertions(+), 40 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 1d3503886a9c4..ae7a10cb67580 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -71,7 +71,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener { private final Time time; - private final SubscriptionState subscriptions; private final ConsumerMetadata metadata; private final LogContext logContext; private final Logger log; @@ -88,6 +87,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private final boolean throwOnFetchStableOffsetUnsupported; final PendingRequests pendingRequests; private boolean closing = false; + private Map latestPartitionOffsets = Collections.emptyMap(); /** * Last member epoch sent in a commit request. Empty if no epoch was included in the last @@ -158,7 +158,6 @@ public CommitRequestManager( this.coordinatorRequestManager = coordinatorRequestManager; this.groupId = groupId; this.groupInstanceId = groupInstanceId; - this.subscriptions = subscriptions; this.metadata = metadata; this.retryBackoffMs = retryBackoffMs; this.retryBackoffMaxMs = retryBackoffMaxMs; @@ -267,7 +266,7 @@ private CompletableFuture> requestAutoCom public void maybeAutoCommitAsync() { if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { OffsetCommitRequestState requestState = createOffsetCommitRequest( - subscriptions.allConsumed(), + latestPartitionOffsets, Long.MAX_VALUE); CompletableFuture> result = requestAutoCommit(requestState); // Reset timer to the interval (even if no request was generated), but ensure that if @@ -324,7 +323,7 @@ public CompletableFuture maybeAutoCommitSyncBeforeRevocation(final long de CompletableFuture result = new CompletableFuture<>(); OffsetCommitRequestState requestState = - createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs); + createOffsetCommitRequest(latestPartitionOffsets, deadlineMs); autoCommitSyncBeforeRevocationWithRetries(requestState, result); return result; } @@ -348,7 +347,7 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState log.debug("Member {} will retry auto-commit of latest offsets after receiving retriable error {}", memberInfo.memberId, error.getMessage()); - requestAttempt.offsets = subscriptions.allConsumed(); + requestAttempt.offsets = latestPartitionOffsets; requestAttempt.resetFuture(); autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result); } @@ -389,7 +388,7 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState * future will be completed with a {@link RetriableCommitFailedException}. */ public CompletableFuture> commitAsync(final Optional> offsets) { - Map commitOffsets = offsets.orElseGet(subscriptions::allConsumed); + Map commitOffsets = offsets.orElseGet(this::latestPartitionOffsets); if (commitOffsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(Map.of()); @@ -417,15 +416,15 @@ public CompletableFuture> commitAsync(fin * an expected retriable error. * @return Future that will complete when a successful response */ - public CompletableFuture> commitSync(final Optional> offsets, + public CompletableFuture> commitSync(final Map offsets, final long deadlineMs) { - Map commitOffsets = offsets.orElseGet(subscriptions::allConsumed); - if (commitOffsets.isEmpty()) { + Objects.requireNonNull(offsets); + if (offsets.isEmpty()) { return CompletableFuture.completedFuture(Map.of()); } - maybeUpdateLastSeenEpochIfNewer(commitOffsets); + maybeUpdateLastSeenEpochIfNewer(offsets); CompletableFuture> result = new CompletableFuture<>(); - OffsetCommitRequestState requestState = createOffsetCommitRequest(commitOffsets, deadlineMs); + OffsetCommitRequestState requestState = createOffsetCommitRequest(offsets, deadlineMs); commitSyncWithRetries(requestState, result); return result; } @@ -637,6 +636,14 @@ private void maybeUpdateLastSeenEpochIfNewer(final Map latestPartitionOffsets() { + return latestPartitionOffsets; + } + + public void setLatestPartitionOffsets(Map offsets) { + this.latestPartitionOffsets = Collections.unmodifiableMap(offsets); + } + class OffsetCommitRequestState extends RetriableRequestState { private Map offsets; private final String groupId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 69cc0072a39b1..9a8e21fe3c708 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -207,7 +207,8 @@ public void process(ApplicationEvent event) { private void process(final PollEvent event) { if (requestManagers.commitRequestManager.isPresent()) { - requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs())); + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateAutoCommitTimer(event.pollTimeMs()); requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { hrm.membershipManager().onConsumerPoll(); hrm.resetPollTimer(event.pollTimeMs()); @@ -250,7 +251,10 @@ private void process(final SyncCommitEvent event) { try { CommitRequestManager manager = requestManagers.commitRequestManager.get(); - CompletableFuture> future = manager.commitSync(event.offsets(), event.deadlineMs()); + CompletableFuture> future = manager.commitSync( + event.offsets().orElseGet(subscriptions::allConsumed), + event.deadlineMs() + ); future.whenComplete(complete(event.future())); } catch (Exception e) { event.future().completeExceptionally(e); @@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) { */ private void process(final CheckAndUpdatePositionsEvent event) { CompletableFuture future = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - future.whenComplete(complete(event.future())); + final CompletableFuture b = event.future(); + future.whenComplete((BiConsumer) (value, exception) -> { + if (exception != null) + b.completeExceptionally(exception); + else { + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); + b.complete(value); + } + }); } private void process(final TopicMetadataEvent event) { @@ -580,6 +593,8 @@ private void process(final SeekUnvalidatedEvent event) { metadata.currentLeader(event.partition()) ); subscriptions.seekUnvalidated(event.partition(), newPosition); + requestManagers.commitRequestManager.ifPresent(commitRequestManager -> + commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed())); event.future().complete(null); } catch (Exception e) { event.future().completeExceptionally(e); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 1b7ed8599238d..cf43fd1fcc0ea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -92,7 +92,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -220,8 +219,10 @@ public void testPollEnsureAutocommitSent() { assertPoll(0, commitRequestManager); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); List pollResults = assertPoll(1, commitRequestManager); pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse( "t1", @@ -248,9 +249,9 @@ public void testPollEnsureCorrectInflightRequestBufferSize() { // Add the requests to the CommitRequestManager and store their futures long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; - commitManager.commitSync(Optional.of(offsets1), deadlineMs); + commitManager.commitSync(offsets1, deadlineMs); commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), deadlineMs); - commitManager.commitSync(Optional.of(offsets2), deadlineMs); + commitManager.commitSync(offsets2, deadlineMs); commitManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 1)), deadlineMs); // Poll the CommitRequestManager and verify that the inflightOffsetFetches size is correct @@ -300,7 +301,7 @@ public void testCommitSync() { CommitRequestManager commitRequestManager = create(false, 100); CompletableFuture> future = commitRequestManager.commitSync( - Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs); + offsets, time.milliseconds() + defaultApiTimeoutMs); assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size()); List pollResults = assertPoll(1, commitRequestManager); pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse( @@ -323,11 +324,10 @@ public void testCommitSyncWithEmptyOffsets() { TopicPartition tp = new TopicPartition("topic", 1); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), ""); Map offsets = Map.of(tp, offsetAndMetadata); - doReturn(offsets).when(subscriptionState).allConsumed(); CommitRequestManager commitRequestManager = create(false, 100); CompletableFuture> future = commitRequestManager.commitSync( - Optional.empty(), time.milliseconds() + defaultApiTimeoutMs); + offsets, time.milliseconds() + defaultApiTimeoutMs); assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size()); List pollResults = assertPoll(1, commitRequestManager); pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse( @@ -336,7 +336,6 @@ public void testCommitSyncWithEmptyOffsets() { (short) 1, Errors.NONE))); - verify(subscriptionState).allConsumed(); verify(metadata).updateLastSeenEpochIfNewer(tp, 1); Map commitOffsets = assertDoesNotThrow(() -> future.get()); assertTrue(future.isDone()); @@ -344,15 +343,13 @@ public void testCommitSyncWithEmptyOffsets() { } @Test - public void testCommitSyncWithEmptyAllConsumedOffsets() { + public void testCommitSyncWithEmptyLatestPartitionOffsetsOffsets() { subscriptionState = mock(SubscriptionState.class); - doReturn(Map.of()).when(subscriptionState).allConsumed(); CommitRequestManager commitRequestManager = create(true, 100); CompletableFuture> future = commitRequestManager.commitSync( - Optional.empty(), time.milliseconds() + defaultApiTimeoutMs); + Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs); - verify(subscriptionState).allConsumed(); Map commitOffsets = assertDoesNotThrow(() -> future.get()); assertTrue(future.isDone()); assertTrue(commitOffsets.isEmpty()); @@ -390,9 +387,9 @@ public void testCommitAsyncWithEmptyOffsets() { TopicPartition tp = new TopicPartition("topic", 1); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0, Optional.of(1), ""); Map offsets = Map.of(tp, offsetAndMetadata); - doReturn(offsets).when(subscriptionState).allConsumed(); CommitRequestManager commitRequestManager = create(true, 100); + commitRequestManager.setLatestPartitionOffsets(offsets); CompletableFuture> future = commitRequestManager.commitAsync(Optional.empty()); assertEquals(1, commitRequestManager.unsentOffsetCommitRequests().size()); List pollResults = assertPoll(1, commitRequestManager); @@ -402,7 +399,6 @@ public void testCommitAsyncWithEmptyOffsets() { (short) 1, Errors.NONE))); - verify(subscriptionState).allConsumed(); verify(metadata).updateLastSeenEpochIfNewer(tp, 1); assertTrue(future.isDone()); Map commitOffsets = assertDoesNotThrow(() -> future.get()); @@ -410,14 +406,13 @@ public void testCommitAsyncWithEmptyOffsets() { } @Test - public void testCommitAsyncWithEmptyAllConsumedOffsets() { + public void testCommitAsyncWithEmptyLatestPartitionOffsetsOffsets() { subscriptionState = mock(SubscriptionState.class); - doReturn(Map.of()).when(subscriptionState).allConsumed(); CommitRequestManager commitRequestManager = create(true, 100); + commitRequestManager.setLatestPartitionOffsets(Collections.emptyMap()); CompletableFuture> future = commitRequestManager.commitAsync(Optional.empty()); - verify(subscriptionState).allConsumed(); assertTrue(future.isDone()); Map commitOffsets = assertDoesNotThrow(() -> future.get()); assertTrue(commitOffsets.isEmpty()); @@ -434,6 +429,7 @@ public void testAsyncAutocommitNotRetriedAfterException() { subscriptionState.seek(tp, 100); time.sleep(commitInterval); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); List futures = assertPoll(1, commitRequestManager); // Complete the autocommit request exceptionally. It should fail right away, without retry. futures.get(0).onComplete(mockOffsetCommitResponse( @@ -447,13 +443,16 @@ public void testAsyncAutocommitNotRetriedAfterException() { // retried). time.sleep(retryBackoffMs); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); assertPoll(0, commitRequestManager); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); // Only when polling after the auto-commit interval, a new auto-commit request should be // generated. time.sleep(commitInterval); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); futures = assertPoll(1, commitRequestManager); assertEmptyPendingRequests(commitRequestManager); futures.get(0).onComplete(mockOffsetCommitResponse( @@ -475,7 +474,7 @@ public void testCommitSyncRetriedAfterExpectedRetriableException(Errors error) { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs); + CompletableFuture> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, error, commitResult); // We expect that request should have been retried on this sync commit. @@ -501,7 +500,7 @@ public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() { new TopicPartition("topic", 1), new OffsetAndMetadata(0)); long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; - CompletableFuture> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs); + CompletableFuture> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.UNKNOWN_MEMBER_ID); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); @@ -521,7 +520,7 @@ public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() { // Send commit request expected to be retried on retriable errors CompletableFuture> commitResult = commitRequestManager.commitSync( - Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs); + offsets, time.milliseconds() + defaultApiTimeoutMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.STALE_MEMBER_EPOCH); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); @@ -545,6 +544,7 @@ public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheIn TopicPartition t1p = new TopicPartition("topic1", 0); subscriptionState.assignFromUser(singleton(t1p)); subscriptionState.seek(t1p, 10); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); // Async commit on the interval fails with fatal stale epoch and just resets the timer to // the interval @@ -558,6 +558,7 @@ public void testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheIn "interval expires"); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -596,6 +597,7 @@ public void testAutocommitEnsureOnlyOneInflightRequest() { CommitRequestManager commitRequestManager = create(true, 100); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); List futures = assertPoll(1, commitRequestManager); time.sleep(100); @@ -621,6 +623,7 @@ public void testAutoCommitBeforeRevocationNotBlockedByAutoCommitOnIntervalInflig CommitRequestManager commitRequestManager = create(true, 100); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); NetworkClientDelegate.FutureCompletionHandler autoCommitOnInterval = @@ -647,6 +650,7 @@ public void testAutocommitInterceptorsInvoked() { CommitRequestManager commitRequestManager = create(true, 100); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); List futures = assertPoll(1, commitRequestManager); // complete the unsent request to trigger interceptor @@ -665,6 +669,7 @@ public void testAutocommitInterceptorsNotInvokedOnError() { CommitRequestManager commitRequestManager = create(true, 100); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); List futures = assertPoll(1, commitRequestManager); // complete the unsent request to trigger interceptor @@ -693,6 +698,7 @@ public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() { // Auto-commit of empty offsets time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); commitRequestManager.maybeAutoCommitAsync(); // Next auto-commit consumed offsets (not empty). Should generate a request, ensuring @@ -700,6 +706,7 @@ public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() { subscriptionState.seek(t1p, 100); time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); commitRequestManager.maybeAutoCommitAsync(); assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetCommits.size()); @@ -717,6 +724,7 @@ public void testAutoCommitOnIntervalSkippedIfPreviousOneInFlight() { // Send auto-commit request that will remain in-flight without a response time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); commitRequestManager.maybeAutoCommitAsync(); List futures = assertPoll(1, commitRequestManager); assertEquals(1, futures.size()); @@ -949,7 +957,7 @@ public void testOffsetCommitSyncTimeoutNotReturnedOnPollAndFails() { // Send sync offset commit request that fails with retriable error. long deadlineMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs); + CompletableFuture> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, Errors.REQUEST_TIMED_OUT); // Request retried after backoff, and fails with retriable again. Should not complete yet @@ -984,7 +992,7 @@ public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExp // Send offset commit request that fails with retriable error. long deadlineMs = time.milliseconds() + retryBackoffMs * 2; - CompletableFuture> commitResult = commitRequestManager.commitSync(Optional.of(offsets), deadlineMs); + CompletableFuture> commitResult = commitRequestManager.commitSync(offsets, deadlineMs); completeOffsetCommitRequestWithError(commitRequestManager, error); // Sleep to expire the request timeout. Request should fail on the next poll with a @@ -1036,7 +1044,7 @@ public void testOffsetCommitSingleFailedAttemptPerRequestWhenPartitionErrors(fin offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2)); offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3)); - commitRequestManager.commitSync(Optional.of(offsets), time.milliseconds() + defaultApiTimeoutMs); + commitRequestManager.commitSync(offsets, time.milliseconds() + defaultApiTimeoutMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); @@ -1061,7 +1069,7 @@ public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() { new OffsetAndMetadata(0)); long deadlineMs = time.milliseconds() + defaultApiTimeoutMs; - commitRequestManager.commitSync(Optional.of(offsets), deadlineMs); + commitRequestManager.commitSync(offsets, deadlineMs); NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new TimeoutException()); @@ -1215,6 +1223,7 @@ public void testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Er subscriptionState.assignFromUser(singleton(tp)); subscriptionState.seek(tp, 5); long deadlineMs = time.milliseconds() + retryBackoffMs * 2; + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); // Send commit request expected to be retried on STALE_MEMBER_EPOCH error while it does not expire commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs); @@ -1267,6 +1276,7 @@ public void testLastEpochSentOnCommit() { TopicPartition tp = new TopicPartition("topic", 1); subscriptionState.assignFromUser(singleton(tp)); subscriptionState.seek(tp, 100); + commitRequestManager.setLatestPartitionOffsets(subscriptionState.allConsumed()); // Send auto commit to revoke partitions, expected to be retried on STALE_MEMBER_EPOCH // with the latest epochs received (using long deadline to avoid expiring the request diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 911c028f728b7..de702dc86fd17 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -448,10 +448,10 @@ public void testSyncCommitEvent(Optional> SyncCommitEvent event = new SyncCommitEvent(offsets, 12345); setupProcessor(true); - doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets, 12345); + doReturn(CompletableFuture.completedFuture(offsets.orElse(Map.of()))).when(commitRequestManager).commitSync(offsets.orElseGet(Collections::emptyMap), 12345); processor.process(event); - verify(commitRequestManager).commitSync(offsets, 12345); + verify(commitRequestManager).commitSync(offsets.orElseGet(Collections::emptyMap), 12345); Map committedOffsets = assertDoesNotThrow(() -> event.future().get()); assertEquals(offsets.orElse(Map.of()), committedOffsets); } @@ -475,7 +475,7 @@ public void testSyncCommitEventWithException() { doReturn(future).when(commitRequestManager).commitSync(any(), anyLong()); processor.process(event); - verify(commitRequestManager).commitSync(Optional.empty(), 12345); + verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345); assertFutureThrows(event.future(), IllegalStateException.class); }