Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LI-CHERRY-PICK] [e4215c1] KAFKA-8325, KAFKA-8202; Remove batch from in-flight requests on MESSAGE_TOO_LARGE errors (#7176) #36

Open
wants to merge 1 commit into
base: 2.0-li
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>();
}

public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
private void maybeRemoveFromInflightBatches(ProducerBatch batch) {
List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
if (batches != null) {
batches.remove(batch);
Expand All @@ -171,6 +171,11 @@ public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
}
}

private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
}

/**
* Get the in-flight batches that has reached delivery timeout.
*/
Expand Down Expand Up @@ -593,7 +598,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
this.sensors.recordBatchSplit();
maybeRemoveFromInflightBatches(batch);
} else if (error != Errors.NONE) {
Expand Down Expand Up @@ -678,8 +683,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
}

if (batch.done(response.baseOffset, response.logAppendTime, null)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
}
}

Expand Down Expand Up @@ -718,8 +722,7 @@ private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime,
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

if (batch.done(baseOffset, logAppendTime, exception)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2043,7 +2043,7 @@ public void testExpiredBatchDoesNotRetry() throws Exception {
@Test
public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
long deliverTimeoutMs = 1500L;
// create a producer batch with more than one record so it is eligible to split
// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
Expand Down Expand Up @@ -2138,6 +2138,181 @@ public void testExpiredBatchesInMultiplePartitions() throws Exception {
assertTrue(e.getCause() instanceof TimeoutException);
}
}
/************************ Uncomment this after we sync with upstream on 2.3.1 ****************************************
@Test
public void testTransactionalRequestsSentOnShutdown() {
// create a sender with retries = 1
int maxRetries = 1;
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.failIfNotReadyForSend();
txnManager.maybeAddPartitionToTransaction(tp);
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
sender.runOnce();
sender.initiateClose();
txnManager.beginCommit();
AssertEndTxnRequestMatcher endTxnMatcher = new AssertEndTxnRequestMatcher(TransactionResult.COMMIT);
client.prepareResponse(endTxnMatcher, new EndTxnResponse(0, Errors.NONE));
sender.run();
assertTrue("Response didn't match in test", endTxnMatcher.matched);
} finally {
m.close();
}
}

@Test
public void testIncompleteTransactionAbortOnShutdown() {
// create a sender with retries = 1
int maxRetries = 1;
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.failIfNotReadyForSend();
txnManager.maybeAddPartitionToTransaction(tp);
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
sender.runOnce();
sender.initiateClose();
AssertEndTxnRequestMatcher endTxnMatcher = new AssertEndTxnRequestMatcher(TransactionResult.ABORT);
client.prepareResponse(endTxnMatcher, new EndTxnResponse(0, Errors.NONE));
sender.run();
assertTrue("Response didn't match in test", endTxnMatcher.matched);
} finally {
m.close();
}
}

@Test(timeout = 10000L)
public void testForceShutdownWithIncompleteTransaction() {
// create a sender with retries = 1
int maxRetries = 1;
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.failIfNotReadyForSend();
txnManager.maybeAddPartitionToTransaction(tp);
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
sender.runOnce();

// Try to commit the transaction but it won't happen as we'll forcefully close the sender
TransactionalRequestResult commitResult = txnManager.beginCommit();

sender.forceClose();
sender.run();
assertThrows("The test expected to throw a KafkaException for forcefully closing the sender",
KafkaException.class, commitResult::await);
} finally {
m.close();
}
}

@Test
public void testDoNotPollWhenNoRequestSent() {
client = spy(new MockClient(time, metadata));

TransactionManager txnManager = new TransactionManager(logContext, "testDoNotPollWhenNoRequestSent", 6000, 100);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

// doInitTransactions calls sender.doOnce three times, only two requests are sent, so we should only poll twice
verify(client, times(2)).poll(eq(RETRY_BACKOFF_MS), anyLong());
}
****************************** Uncomment this after we sync with upstream on 2.3.1 ***********************************/
@Test
public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);

setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.maybeAddPartitionToTransaction(tp0);
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.run(time.milliseconds());

// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> request2 =
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;

// send request
sender.run(time.milliseconds());
assertEquals(1, sender.inFlightBatches(tp0).size());
// return a MESSAGE_TOO_LARGE error
client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
sender.run(time.milliseconds());

// process retried response
sender.run(time.milliseconds());
client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
sender.run(time.milliseconds());

// In-flight batches should be empty. Sleep past the expiration time of the batch and run once, no error should be thrown
assertEquals(0, sender.inFlightBatches(tp0).size());
time.sleep(2000);
sender.run(time.milliseconds());
}

/************************ Uncomment this after we sync with upstream on 2.3.1 ****************************************
class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {

private TransactionResult requiredResult;
private boolean matched = false;

AssertEndTxnRequestMatcher(TransactionResult requiredResult) {
this.requiredResult = requiredResult;
}

@Override
public boolean matches(AbstractRequest body) {
if (body instanceof EndTxnRequest) {
assertSame(requiredResult, ((EndTxnRequest) body).command());
matched = true;
return true;
} else {
return false;
}
}
}
****************************** Uncomment this after we sync with upstream on 2.3.1 ***********************************/

private class MatchingBufferPool extends BufferPool {
IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;
Expand Down