From a252374dca24ee514a74792ba973c71163bdc96a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 28 Jan 2025 00:10:20 -0800 Subject: [PATCH 1/9] KAFKA-18659: librdkafka compressed produce fails unless api versions returns produce v0 --- .../kafka/common/protocol/Protocol.java | 7 +++- .../kafka/common/requests/ProduceRequest.java | 13 +++--- .../common/message/ProduceRequest.json | 7 +++- .../common/message/ProduceResponse.json | 7 +++- .../common/requests/ProduceRequestTest.java | 3 +- .../main/scala/kafka/server/KafkaApis.scala | 5 +++ .../kafka/server/ProduceRequestTest.scala | 40 ++++++++++++++++++- 7 files changed, 67 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 1459a90103012..dddee842edef9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.TaggedFields; import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.requests.ProduceRequest; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -178,7 +179,9 @@ public static String toHtml() { // Requests b.append("Requests:
\n"); Schema[] requests = key.messageType.requestSchemas(); - for (short version = key.oldestVersion(); version <= key.latestVersion(); version++) { + // See `ProduceRequest.MIN_VERSION` for details on why we need to do this + short oldestVersion = key == ApiKeys.PRODUCE ? ProduceRequest.MIN_VERSION : key.oldestVersion(); + for (short version = oldestVersion; version <= key.latestVersion(); version++) { Schema schema = requests[version]; if (schema == null) throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version); @@ -208,7 +211,7 @@ public static String toHtml() { // Responses b.append("Responses:
\n"); Schema[] responses = key.messageType.responseSchemas(); - for (int i = 0; i < responses.length; i++) { + for (int i = oldestVersion; i < responses.length; i++) { Schema schema = responses[i]; // Schema if (schema != null) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 8fbd86cb9bb43..5b76ed9a5ee27 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -40,6 +40,11 @@ import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; public class ProduceRequest extends AbstractRequest { + // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, + // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude + // them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning + // `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2. + public static final short MIN_VERSION = 3; public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11; public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) { @@ -47,7 +52,7 @@ public static Builder builder(ProduceRequestData data, boolean useTransactionV1V // LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1. short maxVersion = useTransactionV1Version ? LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 : ApiKeys.PRODUCE.latestVersion(); - return new Builder(ApiKeys.PRODUCE.oldestVersion(), maxVersion, data); + return new Builder(MIN_VERSION, maxVersion, data); } public static Builder builder(ProduceRequestData data) { @@ -69,11 +74,6 @@ public ProduceRequest build(short version) { return build(version, true); } - // Visible for testing only - public ProduceRequest buildUnsafe(short version) { - return build(version, false); - } - private ProduceRequest build(short version, boolean validate) { if (validate) { // Validate the given records first @@ -244,4 +244,5 @@ public static ProduceRequest parse(ByteBuffer buffer, short version) { public static boolean isTransactionV2Requested(short version) { return version > LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2; } + } diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index db7d961f1373f..7f55cf09cb3ec 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -18,7 +18,10 @@ "type": "request", "listeners": ["broker"], "name": "ProduceRequest", - // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline. + // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, + // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude + // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning + // `UnsupportedVersion` errors. // // Version 1 and 2 are the same as version 0. // @@ -44,7 +47,7 @@ // transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a // AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within // a transaction. - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 5c12539dfb118..cf67acaa82cbd 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -17,7 +17,10 @@ "apiKey": 0, "type": "response", "name": "ProduceResponse", - // Versions 0-2 were removed in Apache Kafka 4.0, Version 3 is the new baseline. + // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, + // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude + // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning + // `UnsupportedVersion` errors. // // Version 1 added the throttle time. // Version 2 added the log append time. @@ -38,7 +41,7 @@ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). // // Version 12 is the same as version 10 (KIP-890). - "validVersions": "3-12", + "validVersions": "0-12", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index ecb8869c38bd2..26bedc55e35b0 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -212,8 +212,7 @@ public void testV6AndBelowCannotUseZStdCompression() { .setAcks((short) 1) .setTimeoutMs(1000); // Can't create ProduceRequest instance with version within [3, 7) - for (short version = 3; version < 7; version++) { - + for (short version = ProduceRequest.MIN_VERSION; version < 7; version++) { ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData); assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 18795a7e0f38d..c1afc5298b873 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -374,6 +374,11 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val produceRequest = request.body[ProduceRequest] + // See `ProduceRequest.MIN_VERSION` for details on why we need to do this + if (produceRequest.version < ProduceRequest.MIN_VERSION) { + requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception()) + return; + } if (RequestUtils.hasTransactionalRecords(produceRequest)) { val isAuthorizedTransactional = produceRequest.transactionalId != null && diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 64111f1487513..9ae2b35841fc8 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -32,6 +32,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.ValueSource @@ -254,7 +255,7 @@ class ProduceRequestTest extends BaseRequestTest { // Create a single-partition topic compressed with ZSTD val topicConfig = new Properties topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, BrokerCompressionType.ZSTD.name) - val partitionToLeader = createTopic(topic, topicConfig = topicConfig) + val partitionToLeader = createTopic(topic, topicConfig = topicConfig) val leader = partitionToLeader(partition) val memoryRecords = MemoryRecords.withRecords(Compression.zstd().build(), new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) @@ -283,6 +284,43 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(-1, partitionProduceResponse1.logAppendTimeMs) } + /** + * See `ProduceRequest.MIN_VERSION` for the details on why we need special handling for produce request v0-v2 (inclusive). + */ + @Test + def testProduceRequestV0V1V2FailsWithUnsupportedVersion(): Unit = { + val topic = "topic" + val partition = 0 + val partitionToLeader = createTopic(topic) + val leader = partitionToLeader(partition) + val memoryRecords = MemoryRecords.withRecords(Compression.none().build(), + new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData() + .setIndex(partition) + .setRecords(memoryRecords)))) + .iterator)) + .setAcks((-1).toShort) + .setTimeoutMs(3000) + .setTransactionalId(null) + + for (i <- 0 until ProduceRequest.MIN_VERSION) { + val version = i.toShort + val produceResponse1 = sendProduceRequest(leader, new ProduceRequest.Builder(version, version, partitionRecords).build()) + val topicProduceResponse1 = produceResponse1.data.responses.asScala.head + val partitionProduceResponse1 = topicProduceResponse1.partitionResponses.asScala.head + val tp1 = new TopicPartition(topicProduceResponse1.name, partitionProduceResponse1.index) + assertEquals(topicPartition, tp1) + assertEquals(Errors.UNSUPPORTED_VERSION, Errors.forCode(partitionProduceResponse1.errorCode)) + assertEquals(-1, partitionProduceResponse1.baseOffset) + assertEquals(-1, partitionProduceResponse1.logAppendTimeMs) + } + } + private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = { connectAndReceive[ProduceResponse](request, destination = brokerSocketServer(leaderId)) } From 8a5d335e844c92a5f70667f5913e0ba2ebe9aca6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 30 Jan 2025 11:43:42 -0800 Subject: [PATCH 2/9] New approach. --- .../apache/kafka/common/protocol/ApiKeys.java | 23 +++++++--- .../kafka/common/protocol/Protocol.java | 43 +++++++++---------- .../common/requests/ApiVersionsResponse.java | 6 +-- .../kafka/common/requests/ProduceRequest.java | 8 +--- .../kafka/common/protocol/ApiKeysTest.java | 3 +- .../requests/ApiVersionsResponseTest.java | 7 ++- .../common/requests/ProduceRequestTest.java | 17 +++++++- .../main/scala/kafka/server/KafkaApis.scala | 5 --- .../AbstractApiVersionsRequestTest.scala | 5 +++ .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../kafka/server/ProduceRequestTest.scala | 35 +++++++++------ 11 files changed, 91 insertions(+), 63 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 1f8a98554c243..df65eb254ebb4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -149,6 +149,12 @@ public enum ApiKeys { private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) .collect(Collectors.toMap(key -> (int) key.id, Function.identity())); + // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, + // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude + // them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning + // `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2. + public static final short PRODUCE_OLDEST_VERSION = 3; + /** the permanent and immutable id of an API - this can't change ever */ public final short id; @@ -227,6 +233,9 @@ public short latestVersion(boolean enableUnstableLastVersion) { } public short oldestVersion() { + // See #PRODUCE_OLDEST_VERSION for details of why we do this + if (this == PRODUCE) + return PRODUCE_OLDEST_VERSION; return messageType.lowestSupportedVersion(); } @@ -264,8 +273,11 @@ public boolean hasValidVersion() { return oldestVersion() <= latestVersion(); } - public Optional toApiVersion(boolean enableUnstableLastVersion) { - short oldestVersion = oldestVersion(); + public Optional toApiVersion(boolean enableUnstableLastVersion, + Optional listenerType) { + // see `PRODUCE_MIN_VERSION` for details on why we do this + short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ? + messageType.lowestSupportedVersion() : oldestVersion(); short latestVersion = latestVersion(enableUnstableLastVersion); // API is entirely disabled if latestStableVersion is smaller than oldestVersion. @@ -299,7 +311,7 @@ static String toHtml() { b.append("Key\n"); b.append(""); clientApis().stream() - .filter(apiKey -> apiKey.toApiVersion(false).isPresent()) + .filter(apiKey -> apiKey.toApiVersion(false, Optional.empty()).isPresent()) .forEach(apiKey -> { b.append("\n"); b.append(""); @@ -341,10 +353,7 @@ public static EnumSet controllerApis() { } public static EnumSet clientApis() { - List apis = Arrays.stream(ApiKeys.values()) - .filter(apiKey -> apiKey.inScope(ApiMessageType.ListenerType.BROKER)) - .collect(Collectors.toList()); - return EnumSet.copyOf(apis); + return brokerApis(); } public static EnumSet apisForListener(ApiMessageType.ListenerType listener) { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index dddee842edef9..237948f61c97d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.TaggedFields; import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.requests.ProduceRequest; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -179,9 +178,7 @@ public static String toHtml() { // Requests b.append("Requests:
\n"); Schema[] requests = key.messageType.requestSchemas(); - // See `ProduceRequest.MIN_VERSION` for details on why we need to do this - short oldestVersion = key == ApiKeys.PRODUCE ? ProduceRequest.MIN_VERSION : key.oldestVersion(); - for (short version = oldestVersion; version <= key.latestVersion(); version++) { + for (short version = key.oldestVersion(); version <= key.latestVersion(); version++) { Schema schema = requests[version]; if (schema == null) throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version); @@ -211,26 +208,26 @@ public static String toHtml() { // Responses b.append("Responses:
\n"); Schema[] responses = key.messageType.responseSchemas(); - for (int i = oldestVersion; i < responses.length; i++) { - Schema schema = responses[i]; + for (int version = key.oldestVersion(); version < key.latestVersion(); version++) { + Schema schema = responses[version]; + if (schema == null) + throw new IllegalStateException("Unexpected null schema for " + key + " with version " + version); // Schema - if (schema != null) { - b.append("
"); - // Version header - b.append("
");
-                    b.append(key.name);
-                    b.append(" Response (Version: ");
-                    b.append(i);
-                    b.append(") => ");
-                    schemaToBnfHtml(responses[i], b, 2);
-                    b.append("
"); - - b.append("

Response header version: "); - b.append(key.responseHeaderVersion((short) i)); - b.append("

\n"); - - schemaToFieldTableHtml(responses[i], b); - } + b.append("
"); + // Version header + b.append("
");
+                b.append(key.name);
+                b.append(" Response (Version: ");
+                b.append(version);
+                b.append(") => ");
+                schemaToBnfHtml(responses[version], b, 2);
+                b.append("
"); + + b.append("

Response header version: "); + b.append(key.responseHeaderVersion((short) version)); + b.append("

\n"); + + schemaToFieldTableHtml(responses[version], b); b.append("
\n"); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 643b4e44c0e2d..4d5a7e2ef2d3d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -204,7 +204,7 @@ public static ApiVersionCollection filterApis( // Skip telemetry APIs if client telemetry is disabled. if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled) continue; - apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add); + apiKey.toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)).ifPresent(apiKeys::add); } return apiKeys; } @@ -215,7 +215,7 @@ public static ApiVersionCollection collectApis( ) { ApiVersionCollection res = new ApiVersionCollection(); for (ApiKeys apiKey : apiKeys) { - apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add); + apiKey.toApiVersion(enableUnstableLastVersion, Optional.empty()).ifPresent(res::add); } return res; } @@ -238,7 +238,7 @@ public static ApiVersionCollection intersectForwardableApis( ) { ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { - final Optional brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion); + final Optional brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)); if (brokerApiVersion.isEmpty()) { // Broker does not support this API key. continue; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 5b76ed9a5ee27..44b5186ef0545 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -40,11 +40,7 @@ import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET; public class ProduceRequest extends AbstractRequest { - // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude - // them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning - // `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2. - public static final short MIN_VERSION = 3; + public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 11; public static Builder builder(ProduceRequestData data, boolean useTransactionV1Version) { @@ -52,7 +48,7 @@ public static Builder builder(ProduceRequestData data, boolean useTransactionV1V // LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 so that the broker knows that we're using transaction protocol V1. short maxVersion = useTransactionV1Version ? LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 : ApiKeys.PRODUCE.latestVersion(); - return new Builder(MIN_VERSION, maxVersion, data); + return new Builder(ApiKeys.PRODUCE.oldestVersion(), maxVersion, data); } public static Builder builder(ProduceRequestData data) { diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 85ba9b46b31f0..5635657982720 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -104,7 +105,7 @@ public void testHasValidVersions() { public void testHtmlOnlyHaveStableApi() { String html = ApiKeys.toHtml(); for (ApiKeys apiKeys : ApiKeys.clientApis()) { - if (apiKeys.toApiVersion(false).isPresent()) { + if (apiKeys.toApiVersion(false, Optional.empty()).isPresent()) { assertTrue(html.contains("The_Messages_" + apiKeys.name), "Html should contain stable api: " + apiKeys.name); } else { assertFalse(html.contains("The_Messages_" + apiKeys.name), "Html should not contain unstable api: " + apiKeys.name); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index dd8b8144a29e9..99571f135296e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -58,8 +58,11 @@ public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerT for (ApiKeys key : ApiKeys.apisForListener(scope)) { ApiVersion version = defaultResponse.apiVersion(key.id); assertNotNull(version, "Could not find ApiVersion for API " + key.name); - assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name); - assertEquals(version.maxVersion(), key.latestVersion(), "Incorrect max version for Api " + key.name); + if (key == ApiKeys.PRODUCE) + assertEquals(key.messageType.lowestSupportedVersion(), version.minVersion(), "Incorrect min version for Api " + key.name); + else + assertEquals(key.oldestVersion(), version.minVersion(), "Incorrect min version for Api " + key.name); + assertEquals(key.latestVersion(), version.maxVersion(), "Incorrect max version for Api " + key.name); // Check if versions less than min version are indeed set as null, i.e., deprecated. for (int i = 0; i < version.minVersion(); ++i) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 26bedc55e35b0..cb008e5f05a6c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -212,7 +212,7 @@ public void testV6AndBelowCannotUseZStdCompression() { .setAcks((short) 1) .setTimeoutMs(1000); // Can't create ProduceRequest instance with version within [3, 7) - for (short version = ProduceRequest.MIN_VERSION; version < 7; version++) { + for (short version = ApiKeys.PRODUCE.oldestVersion(); version < 7; version++) { ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(version, version, produceData); assertThrowsForAllVersions(requestBuilder, UnsupportedCompressionTypeException.class); } @@ -276,6 +276,21 @@ public void testMixedIdempotentData() { assertTrue(RequestTestUtils.hasIdempotentRecords(request)); } + @Test + public void testBuilderOldestAndLatestAllowed() { + ProduceRequest.Builder builder = ProduceRequest.builder(new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( + new ProduceRequestData.TopicProduceData() + .setName("topic") + .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() + .setIndex(1) + .setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))).iterator())) + .setAcks((short) -1) + .setTimeoutMs(10)); + assertEquals(ApiKeys.PRODUCE_OLDEST_VERSION, builder.oldestAllowedVersion()); + assertEquals(ApiKeys.PRODUCE.latestVersion(), builder.latestAllowedVersion()); + } + private static void assertThrowsForAllVersions(ProduceRequest.Builder builder, Class expectedType) { IntStream.range(builder.oldestAllowedVersion(), builder.latestAllowedVersion() + 1) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c1afc5298b873..18795a7e0f38d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -374,11 +374,6 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { val produceRequest = request.body[ProduceRequest] - // See `ProduceRequest.MIN_VERSION` for details on why we need to do this - if (produceRequest.version < ProduceRequest.MIN_VERSION) { - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.UNSUPPORTED_VERSION.exception()) - return; - } if (RequestUtils.hasTransactionalRecords(produceRequest)) { val isAuthorizedTransactional = produceRequest.transactionalId != null && diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index f71fc37bb2ff7..19cb1188d8f3e 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -116,5 +116,10 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(expectedApiVersion.minVersion, actualApiVersion.minVersion, s"Received unexpected min version for API key ${actualApiVersion.apiKey}.") assertEquals(expectedApiVersion.maxVersion, actualApiVersion.maxVersion, s"Received unexpected max version for API key ${actualApiVersion.apiKey}.") } + + if (listenerName.equals(cluster.clientListener)) { + // See ApiKeys.PRODUCE_OLDEST_VERSION for details on why this is `0` (instead of `3`) + assertEquals(0, apiVersionsResponse.apiVersion(ApiKeys.PRODUCE.id).minVersion) + } } } diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 3a0ffe1b4779f..e69aa3ea82b18 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -78,7 +78,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { new Socket("localhost", socketServer.boundPort(listenerName)) } - private def sendRequest(socket: Socket, request: Array[Byte]): Unit = { + private[server] def sendRequest(socket: Socket, request: Array[Byte]): Unit = { val outgoing = new DataOutputStream(socket.getOutputStream) outgoing.writeInt(request.length) outgoing.write(request) diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 9ae2b35841fc8..fa530ae18ff1f 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -27,7 +27,8 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.message.ProduceRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} +import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, RequestUtils} +import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics @@ -37,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.ValueSource +import java.io.EOFException import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ @@ -285,18 +287,17 @@ class ProduceRequestTest extends BaseRequestTest { } /** - * See `ProduceRequest.MIN_VERSION` for the details on why we need special handling for produce request v0-v2 (inclusive). + * See `ApiKeys.PRODUCE_OLDEST_VERSION` for the details of why we need special handling for produce request v0-v2 (inclusive). */ @Test - def testProduceRequestV0V1V2FailsWithUnsupportedVersion(): Unit = { + def testProduceRequestV0ToV2IsRejectedByBroker(): Unit = { val topic = "topic" val partition = 0 val partitionToLeader = createTopic(topic) val leader = partitionToLeader(partition) val memoryRecords = MemoryRecords.withRecords(Compression.none().build(), new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) - val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = new ProduceRequestData() + val produceRequestData = new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( new ProduceRequestData.TopicProduceData() .setName("topic").setPartitionData(Collections.singletonList( @@ -308,19 +309,25 @@ class ProduceRequestTest extends BaseRequestTest { .setTimeoutMs(3000) .setTransactionalId(null) - for (i <- 0 until ProduceRequest.MIN_VERSION) { + for (i <- 0 to 2) { val version = i.toShort - val produceResponse1 = sendProduceRequest(leader, new ProduceRequest.Builder(version, version, partitionRecords).build()) - val topicProduceResponse1 = produceResponse1.data.responses.asScala.head - val partitionProduceResponse1 = topicProduceResponse1.partitionResponses.asScala.head - val tp1 = new TopicPartition(topicProduceResponse1.name, partitionProduceResponse1.index) - assertEquals(topicPartition, tp1) - assertEquals(Errors.UNSUPPORTED_VERSION, Errors.forCode(partitionProduceResponse1.errorCode)) - assertEquals(-1, partitionProduceResponse1.baseOffset) - assertEquals(-1, partitionProduceResponse1.logAppendTimeMs) + // Broker disconnects when it receives a request with an unsupported version + assertThrows(classOf[EOFException], () => sendProduceRequestData(leader, version, produceRequestData)) } } + // This method avoids some of the version validation performed by the wrapper request classes, which is useful + // to be able to send produce requests with versions 0-2 + private def sendProduceRequestData(leaderId: Int, version: Short, request: ProduceRequestData): ProduceResponse = { + val socket = connect(brokerSocketServer(leaderId), listenerName) + try { + val header = nextRequestHeader(ApiKeys.PRODUCE, version) + val serializedBytes = Utils.toArray(RequestUtils.serialize(header.data, header.headerVersion, request, version)) + sendRequest(socket, serializedBytes) + receive[ProduceResponse](socket, ApiKeys.PRODUCE, version) + } finally socket.close() + } + private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = { connectAndReceive[ProduceResponse](request, destination = brokerSocketServer(leaderId)) } From 6420650749c622a526e7b22e459ff766b5a9fc3b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 30 Jan 2025 15:31:32 -0800 Subject: [PATCH 3/9] Address review feedback --- .../kafka/common/requests/ProduceRequest.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index 44b5186ef0545..a9f5205a308fe 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -67,16 +67,10 @@ public Builder(short minVersion, @Override public ProduceRequest build(short version) { - return build(version, true); - } - - private ProduceRequest build(short version, boolean validate) { - if (validate) { - // Validate the given records first - data.topicData().forEach(tpd -> - tpd.partitionData().forEach(partitionProduceData -> - ProduceRequest.validateRecords(version, partitionProduceData.records()))); - } + // Validate the given records first + data.topicData().forEach(tpd -> + tpd.partitionData().forEach(partitionProduceData -> + ProduceRequest.validateRecords(version, partitionProduceData.records()))); return new ProduceRequest(data, version); } From b38a7ef233f05701a0a25cb24d7111eb455ee76e Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 30 Jan 2025 15:31:50 -0800 Subject: [PATCH 4/9] Fix failing test --- .../org/apache/kafka/tools/BrokerApiVersionsCommandTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java index 49534a0ca8245..b1cc54c828a29 100644 --- a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java @@ -55,7 +55,7 @@ public void testBrokerApiVersionsCommandOutput(ClusterInstance clusterInstance) ApiMessageType.ListenerType listenerType = ApiMessageType.ListenerType.BROKER; NodeApiVersions nodeApiVersions = new NodeApiVersions( - ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true), + ApiVersionsResponse.filterApis(listenerType, true, true), Collections.emptyList()); Iterator apiKeysIter = ApiKeys.clientApis().iterator(); while (apiKeysIter.hasNext()) { @@ -64,7 +64,7 @@ public void testBrokerApiVersionsCommandOutput(ClusterInstance clusterInstance) StringBuilder lineBuilder = new StringBuilder().append("\t"); if (apiKey.inScope(listenerType)) { ApiVersion apiVersion = nodeApiVersions.apiVersion(apiKey); - assertNotNull(apiVersion); + assertNotNull(apiVersion, "No apiVersion found for " + apiKey); String versionRangeStr = (apiVersion.minVersion() == apiVersion.maxVersion()) ? String.valueOf(apiVersion.minVersion()) : From 593db8b4b988cab3511e54a1738e371616bbb7e0 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 30 Jan 2025 15:35:47 -0800 Subject: [PATCH 5/9] Adjust comments --- .../org/apache/kafka/common/protocol/ApiKeys.java | 11 +++++++---- .../main/resources/common/message/ProduceRequest.json | 5 ++--- .../resources/common/message/ProduceResponse.json | 5 ++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index df65eb254ebb4..6dd6159578a20 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -150,9 +150,12 @@ public enum ApiKeys { .collect(Collectors.toMap(key -> (int) key.id, Function.identity())); // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude - // them from the protocol definition. Instead, we reject requests with such versions in `KafkaApis` by returning - // `UnsupportedVersion` errors. We also special case the generated protocol html to exclude versions 0-2. + // these versions have to be included in the api versions response (see KAFKA-18659). In order to achieve that, + // we keep such versions in the protocol definition files, but override `oldestVersion` to return the correct value. + // We also adjust `toApiVersion` to return `0` for produce in the broker listener. + // An alternative approach would be to remove versions `0-2` from the protocol definition files and only override the + // behavior in this file - the main downside is that it would no longer be possible to send requests with produce v0-v2, + // which would make testing significantly harder (it would probably have to be a ducktape test). public static final short PRODUCE_OLDEST_VERSION = 3; /** the permanent and immutable id of an API - this can't change ever */ @@ -275,7 +278,7 @@ public boolean hasValidVersion() { public Optional toApiVersion(boolean enableUnstableLastVersion, Optional listenerType) { - // see `PRODUCE_MIN_VERSION` for details on why we do this + // see `PRODUCE_OLDEST_VERSION` for details on why we do this short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ? messageType.lowestSupportedVersion() : oldestVersion(); short latestVersion = latestVersion(enableUnstableLastVersion); diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 7f55cf09cb3ec..9b3fac2e8059e 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -19,9 +19,8 @@ "listeners": ["broker"], "name": "ProduceRequest", // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude - // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning - // `UnsupportedVersion` errors. + // these versions have to be included in the api versions response (see KAFKA-18659) and are included in `validVersion`. + // See `ApiKeys.PRODUCE_OLDEST_VERSION` for more details. // // Version 1 and 2 are the same as version 0. // diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index cf67acaa82cbd..0161e89f29719 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -18,9 +18,8 @@ "type": "response", "name": "ProduceResponse", // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659), which means we cannot exclude - // them from `validVersions`. Instead, we reject requests with such versions in `KafkaApis` by returning - // `UnsupportedVersion` errors. + // these versions have to be included in the api versions response (see KAFKA-18659) and are included in `validVersion`. + // See `ApiKeys.PRODUCE_OLDEST_VERSION` for more details. // // Version 1 added the throttle time. // Version 2 added the log append time. From 55056f24edaa36ac93eaf70a4639ab7756cf38b4 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 31 Jan 2025 16:44:54 -0800 Subject: [PATCH 6/9] Simplify --- .../apache/kafka/common/protocol/ApiKeys.java | 17 ++----- .../common/message/ProduceRequest.json | 6 +-- .../common/message/ProduceResponse.json | 6 +-- .../requests/ApiVersionsResponseTest.java | 9 ++-- .../common/requests/ProduceRequestTest.java | 2 +- .../unit/kafka/network/ProcessorTest.scala | 22 ++++++++- .../AbstractApiVersionsRequestTest.scala | 6 +-- .../kafka/server/ProduceRequestTest.scala | 47 +------------------ 8 files changed, 41 insertions(+), 74 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6dd6159578a20..9af55f057e8d7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -150,13 +150,9 @@ public enum ApiKeys { .collect(Collectors.toMap(key -> (int) key.id, Function.identity())); // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659). In order to achieve that, - // we keep such versions in the protocol definition files, but override `oldestVersion` to return the correct value. - // We also adjust `toApiVersion` to return `0` for produce in the broker listener. - // An alternative approach would be to remove versions `0-2` from the protocol definition files and only override the - // behavior in this file - the main downside is that it would no longer be possible to send requests with produce v0-v2, - // which would make testing significantly harder (it would probably have to be a ducktape test). - public static final short PRODUCE_OLDEST_VERSION = 3; + // version `0` has to be included in the api versions response (see KAFKA-18659). In order to achieve that, + // we adjust `toApiVersion` to return `0` for the min version of `produce` in the broker listener. + public static final short PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0; /** the permanent and immutable id of an API - this can't change ever */ public final short id; @@ -236,9 +232,6 @@ public short latestVersion(boolean enableUnstableLastVersion) { } public short oldestVersion() { - // See #PRODUCE_OLDEST_VERSION for details of why we do this - if (this == PRODUCE) - return PRODUCE_OLDEST_VERSION; return messageType.lowestSupportedVersion(); } @@ -278,9 +271,9 @@ public boolean hasValidVersion() { public Optional toApiVersion(boolean enableUnstableLastVersion, Optional listenerType) { - // see `PRODUCE_OLDEST_VERSION` for details on why we do this + // see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why we do this short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ? - messageType.lowestSupportedVersion() : oldestVersion(); + PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION : oldestVersion(); short latestVersion = latestVersion(enableUnstableLastVersion); // API is entirely disabled if latestStableVersion is smaller than oldestVersion. diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json index 9b3fac2e8059e..0bb29f92378dd 100644 --- a/clients/src/main/resources/common/message/ProduceRequest.json +++ b/clients/src/main/resources/common/message/ProduceRequest.json @@ -19,8 +19,8 @@ "listeners": ["broker"], "name": "ProduceRequest", // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659) and are included in `validVersion`. - // See `ApiKeys.PRODUCE_OLDEST_VERSION` for more details. + // these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise. + // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details. // // Version 1 and 2 are the same as version 0. // @@ -46,7 +46,7 @@ // transaction V2 (KIP_890 part 2) is enabled, the produce request will also include the function for a // AddPartitionsToTxn call. If V2 is disabled, the client can't use produce request version higher than 11 within // a transaction. - "validVersions": "0-12", + "validVersions": "3-12", "flexibleVersions": "9+", "fields": [ { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "entityType": "transactionalId", diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json index 0161e89f29719..fafcd86401d40 100644 --- a/clients/src/main/resources/common/message/ProduceResponse.json +++ b/clients/src/main/resources/common/message/ProduceResponse.json @@ -18,8 +18,8 @@ "type": "response", "name": "ProduceResponse", // Versions 0-2 were removed in Apache Kafka 4.0, version 3 is the new baseline. Due to a bug in librdkafka, - // these versions have to be included in the api versions response (see KAFKA-18659) and are included in `validVersion`. - // See `ApiKeys.PRODUCE_OLDEST_VERSION` for more details. + // these versions have to be included in the api versions response (see KAFKA-18659), but are rejected otherwise. + // See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for more details. // // Version 1 added the throttle time. // Version 2 added the log append time. @@ -40,7 +40,7 @@ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). // // Version 12 is the same as version 10 (KIP-890). - "validVersions": "0-12", + "validVersions": "3-12", "flexibleVersions": "9+", "fields": [ { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 99571f135296e..c94d021ef214a 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -59,12 +59,12 @@ public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerT ApiVersion version = defaultResponse.apiVersion(key.id); assertNotNull(version, "Could not find ApiVersion for API " + key.name); if (key == ApiKeys.PRODUCE) - assertEquals(key.messageType.lowestSupportedVersion(), version.minVersion(), "Incorrect min version for Api " + key.name); + assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, version.minVersion(), "Incorrect min version for Api " + key.name); else assertEquals(key.oldestVersion(), version.minVersion(), "Incorrect min version for Api " + key.name); assertEquals(key.latestVersion(), version.maxVersion(), "Incorrect max version for Api " + key.name); - // Check if versions less than min version are indeed set as null, i.e., deprecated. + // Check if versions less than min version are indeed set as null, i.e., removed. for (int i = 0; i < version.minVersion(); ++i) { assertNull(key.messageType.requestSchemas()[i], "Request version " + i + " for API " + version.apiKey() + " must be null"); @@ -72,8 +72,11 @@ public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerT "Response version " + i + " for API " + version.apiKey() + " must be null"); } + // The min version returned in ApiResponse for Produce is not the actual min version, so adjust it + var minVersion = (key == ApiKeys.PRODUCE && scope == ListenerType.BROKER) ? + ApiKeys.PRODUCE.oldestVersion() : version.minVersion(); // Check if versions between min and max versions are non null, i.e., valid. - for (int i = version.minVersion(); i <= version.maxVersion(); ++i) { + for (int i = minVersion; i <= version.maxVersion(); ++i) { assertNotNull(key.messageType.requestSchemas()[i], "Request version " + i + " for API " + version.apiKey() + " must not be null"); assertNotNull(key.messageType.responseSchemas()[i], diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index cb008e5f05a6c..4ea5df30b98b1 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -287,7 +287,7 @@ public void testBuilderOldestAndLatestAllowed() { .setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))).iterator())) .setAcks((short) -1) .setTimeoutMs(10)); - assertEquals(ApiKeys.PRODUCE_OLDEST_VERSION, builder.oldestAllowedVersion()); + assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, builder.oldestAllowedVersion()); assertEquals(ApiKeys.PRODUCE.latestVersion(), builder.latestAllowedVersion()); } diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala index 3a862678ca79b..659aa2c6323c9 100644 --- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala +++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala @@ -65,13 +65,31 @@ class ProcessorTest { @Test def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = { val requestHeader = RequestTestUtils.serializeRequestHeader( - new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0)) + new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0)) val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) val e = assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, - "PRODUCE v0 should throw UnsupportedVersionException exception") + "FETCH v0 should throw UnsupportedVersionException exception") assertTrue(e.toString.contains("unsupported version")); } + /** + * We do something unusual with these versions of produce, and we want to make sure we don't regress. + * See `ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details. + */ + @Test + def testParseRequestHeaderForProduceV0ToV2(): Unit = { + for (version <- 0 to 2) { + val requestHeader = RequestTestUtils.serializeRequestHeader( + new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0)) + val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, + () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) + val e = assertThrows(classOf[UnsupportedVersionException], + (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, + s"PRODUCE $version should throw UnsupportedVersionException exception") + assertTrue(e.toString.contains("unsupported version")); + } + } + } diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 19cb1188d8f3e..ea12bf0ee53f6 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -117,9 +117,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(expectedApiVersion.maxVersion, actualApiVersion.maxVersion, s"Received unexpected max version for API key ${actualApiVersion.apiKey}.") } - if (listenerName.equals(cluster.clientListener)) { - // See ApiKeys.PRODUCE_OLDEST_VERSION for details on why this is `0` (instead of `3`) - assertEquals(0, apiVersionsResponse.apiVersion(ApiKeys.PRODUCE.id).minVersion) - } + if (listenerName.equals(cluster.clientListener)) + assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, apiVersionsResponse.apiVersion(ApiKeys.PRODUCE.id).minVersion) } } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index fa530ae18ff1f..5ab33d868a1ff 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -27,18 +27,15 @@ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.message.ProduceRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ -import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse, RequestUtils} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.ValueSource -import java.io.EOFException import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters._ @@ -286,48 +283,6 @@ class ProduceRequestTest extends BaseRequestTest { assertEquals(-1, partitionProduceResponse1.logAppendTimeMs) } - /** - * See `ApiKeys.PRODUCE_OLDEST_VERSION` for the details of why we need special handling for produce request v0-v2 (inclusive). - */ - @Test - def testProduceRequestV0ToV2IsRejectedByBroker(): Unit = { - val topic = "topic" - val partition = 0 - val partitionToLeader = createTopic(topic) - val leader = partitionToLeader(partition) - val memoryRecords = MemoryRecords.withRecords(Compression.none().build(), - new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) - val produceRequestData = new ProduceRequestData() - .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Collections.singletonList( - new ProduceRequestData.TopicProduceData() - .setName("topic").setPartitionData(Collections.singletonList( - new ProduceRequestData.PartitionProduceData() - .setIndex(partition) - .setRecords(memoryRecords)))) - .iterator)) - .setAcks((-1).toShort) - .setTimeoutMs(3000) - .setTransactionalId(null) - - for (i <- 0 to 2) { - val version = i.toShort - // Broker disconnects when it receives a request with an unsupported version - assertThrows(classOf[EOFException], () => sendProduceRequestData(leader, version, produceRequestData)) - } - } - - // This method avoids some of the version validation performed by the wrapper request classes, which is useful - // to be able to send produce requests with versions 0-2 - private def sendProduceRequestData(leaderId: Int, version: Short, request: ProduceRequestData): ProduceResponse = { - val socket = connect(brokerSocketServer(leaderId), listenerName) - try { - val header = nextRequestHeader(ApiKeys.PRODUCE, version) - val serializedBytes = Utils.toArray(RequestUtils.serialize(header.data, header.headerVersion, request, version)) - sendRequest(socket, serializedBytes) - receive[ProduceResponse](socket, ApiKeys.PRODUCE, version) - } finally socket.close() - } - private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = { connectAndReceive[ProduceResponse](request, destination = brokerSocketServer(leaderId)) } From cff110cd933a6db503804dd6cbcdf38ef2d47f02 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 31 Jan 2025 17:05:14 -0800 Subject: [PATCH 7/9] Fix test --- .../apache/kafka/common/requests/ProduceRequestTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java index 4ea5df30b98b1..42a1e1f39681d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java @@ -283,11 +283,12 @@ public void testBuilderOldestAndLatestAllowed() { new ProduceRequestData.TopicProduceData() .setName("topic") .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData() - .setIndex(1) - .setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord))))).iterator())) + .setIndex(1) + .setRecords(MemoryRecords.withRecords(Compression.NONE, simpleRecord)))) + ).iterator())) .setAcks((short) -1) .setTimeoutMs(10)); - assertEquals(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION, builder.oldestAllowedVersion()); + assertEquals(ApiKeys.PRODUCE.oldestVersion(), builder.oldestAllowedVersion()); assertEquals(ApiKeys.PRODUCE.latestVersion(), builder.latestAllowedVersion()); } From 4d0227aefa446be8b133312085d82f7fc878c43d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 31 Jan 2025 17:09:57 -0800 Subject: [PATCH 8/9] Revert change that is no longer required --- core/src/test/scala/unit/kafka/server/BaseRequestTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index e69aa3ea82b18..3a0ffe1b4779f 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -78,7 +78,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness { new Socket("localhost", socketServer.boundPort(listenerName)) } - private[server] def sendRequest(socket: Socket, request: Array[Byte]): Unit = { + private def sendRequest(socket: Socket, request: Array[Byte]): Unit = { val outgoing = new DataOutputStream(socket.getOutputStream) outgoing.writeInt(request.length) outgoing.write(request) From 04db4ebcfe3122144b69e2b7f69213c09a7fbae7 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 1 Feb 2025 10:22:58 -0800 Subject: [PATCH 9/9] Address review comments --- .../apache/kafka/common/protocol/ApiKeys.java | 21 ++++++++++++++++++- .../common/requests/ApiVersionsResponse.java | 7 ++++--- .../kafka/common/protocol/ApiKeysTest.java | 3 +-- .../kafka/server/ApiVersionManager.scala | 2 +- .../unit/kafka/network/ProcessorTest.scala | 19 ++++++++++------- .../AbstractApiVersionsRequestTest.scala | 1 + 6 files changed, 38 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 9af55f057e8d7..1c752945563cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -269,7 +269,26 @@ public boolean hasValidVersion() { return oldestVersion() <= latestVersion(); } - public Optional toApiVersion(boolean enableUnstableLastVersion, + /** + * To workaround a critical bug in librdkafka, the api versions response is inconsistent with the actual versions + * supported by `produce` - this method handles that. It should be called in the context of the api response protocol + * handling. + * + * It should not be used by code generating protocol documentation - we keep that consistent with the actual versions + * supported by `produce`. + * + * See `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details. + */ + public Optional toApiVersionForApiResponse(boolean enableUnstableLastVersion, + ApiMessageType.ListenerType listenerType) { + return toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)); + } + + public Optional toApiVersion(boolean enableUnstableLastVersion) { + return toApiVersion(enableUnstableLastVersion, Optional.empty()); + } + + private Optional toApiVersion(boolean enableUnstableLastVersion, Optional listenerType) { // see `PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION` for details on why we do this short oldestVersion = (this == PRODUCE && listenerType.map(l -> l == ApiMessageType.ListenerType.BROKER).orElse(false)) ? diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 4d5a7e2ef2d3d..324e527984d08 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -204,18 +204,19 @@ public static ApiVersionCollection filterApis( // Skip telemetry APIs if client telemetry is disabled. if ((apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == ApiKeys.PUSH_TELEMETRY) && !clientTelemetryEnabled) continue; - apiKey.toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)).ifPresent(apiKeys::add); + apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType).ifPresent(apiKeys::add); } return apiKeys; } public static ApiVersionCollection collectApis( + ApiMessageType.ListenerType listenerType, Set apiKeys, boolean enableUnstableLastVersion ) { ApiVersionCollection res = new ApiVersionCollection(); for (ApiKeys apiKey : apiKeys) { - apiKey.toApiVersion(enableUnstableLastVersion, Optional.empty()).ifPresent(res::add); + apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType).ifPresent(res::add); } return res; } @@ -238,7 +239,7 @@ public static ApiVersionCollection intersectForwardableApis( ) { ApiVersionCollection apiKeys = new ApiVersionCollection(); for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) { - final Optional brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion, Optional.of(listenerType)); + final Optional brokerApiVersion = apiKey.toApiVersionForApiResponse(enableUnstableLastVersion, listenerType); if (brokerApiVersion.isEmpty()) { // Broker does not support this API key. continue; diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 5635657982720..85ba9b46b31f0 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -105,7 +104,7 @@ public void testHasValidVersions() { public void testHtmlOnlyHaveStableApi() { String html = ApiKeys.toHtml(); for (ApiKeys apiKeys : ApiKeys.clientApis()) { - if (apiKeys.toApiVersion(false, Optional.empty()).isPresent()) { + if (apiKeys.toApiVersion(false).isPresent()) { assertTrue(html.contains("The_Messages_" + apiKeys.name), "Html should contain stable api: " + apiKeys.name); } else { assertFalse(html.contains("The_Messages_" + apiKeys.name), "Html should not contain unstable api: " + apiKeys.name); diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala index fd1c70e509fff..e286bc9352ac0 100644 --- a/core/src/main/scala/kafka/server/ApiVersionManager.scala +++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala @@ -94,7 +94,7 @@ class SimpleApiVersionManager( ) } - private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) + private val apiVersions = ApiVersionsResponse.collectApis(listenerType, enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse( throttleTimeMs: Int, diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala index 659aa2c6323c9..66f3c5d5c7729 100644 --- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala +++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala @@ -17,16 +17,19 @@ package kafka.network -import kafka.server.SimpleApiVersionManager +import kafka.server.metadata.KRaftMetadataCache +import kafka.server.{DefaultApiVersionManager, ForwardingManager, SimpleApiVersionManager} import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.RequestHeaderData import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils} -import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} +import org.apache.kafka.server.BrokerFeatures +import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion} import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue} import org.junit.jupiter.api.Test import org.junit.jupiter.api.function.Executable +import org.mockito.Mockito.mock import java.util.Collections @@ -54,8 +57,8 @@ class ProcessorTest { .setClientId("clientid") .setCorrelationId(0); val requestHeader = RequestTestUtils.serializeRequestHeader(new RequestHeader(requestHeaderData, headerVersion)) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) + val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true) val e = assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "LEADER_AND_ISR should throw InvalidRequestException exception") @@ -66,8 +69,8 @@ class ProcessorTest { def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = { val requestHeader = RequestTestUtils.serializeRequestHeader( new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0)) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) + val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true) val e = assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, "FETCH v0 should throw UnsupportedVersionException exception") @@ -83,8 +86,8 @@ class ProcessorTest { for (version <- 0 to 2) { val requestHeader = RequestTestUtils.serializeRequestHeader( new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0)) - val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, - () => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0)) + val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]), + BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => KRaftVersion.LATEST_PRODUCTION), true) val e = assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable, s"PRODUCE $version should throw UnsupportedVersionException exception") diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index ea12bf0ee53f6..900fb0f66fbc4 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -88,6 +88,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { } val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { ApiVersionsResponse.collectApis( + ApiMessageType.ListenerType.CONTROLLER, ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER), enableUnstableLastVersion )