Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18672; CoordinatorRecordSerde must validate value version #18749

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,6 @@
*/
public interface CoordinatorLoader<U> extends AutoCloseable {

/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
private final short unknownType;

public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}

public short unknownType() {
return unknownType;
}
}

/**
* Object that is returned as part of the future from load(). Holds the partition load time and the
* end time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public CoordinatorRecord deserialize(

final ApiMessage valueMessage = apiMessageValueFor(recordType);
final short valueVersion = readVersion(valueBuffer, "value");

if (valueVersion < valueMessage.lowestSupportedVersion() || valueVersion > valueMessage.highestSupportedVersion()) {
throw new UnknownRecordVersionException(recordType, valueVersion);
}
Comment on lines +80 to +82
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UnknownRecordTypeException is caught as the aborted upgrade might leave behind some "new" records. Should we also consider handling UnknownRecordVersionException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I think that we would need to do a new KIP or to extend KIP-915: Txn and Group Coordinator Downgrade Foundation for this. In KIP-915, we defined that using tagged fields should be the way forward to handle backward compatibility. Hence, I am not sure that ignore newer version is needed.


readMessage(valueMessage, valueBuffer, valueVersion, "value");

return CoordinatorRecord.record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,42 @@
* @param <T> The record type.
*/
public interface Deserializer<T> {
/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this one from the Loader to here.

private final short unknownType;

public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}

public short unknownType() {
return unknownType;
}
}

class UnknownRecordVersionException extends RuntimeException {
private final short type;
private final short unknownVersion;

public UnknownRecordVersionException(short type, short unknownVersion) {
super(String.format("Found an unknown record version %d for %d type", unknownVersion, type));
this.type = type;
this.unknownVersion = unknownVersion;
}

public short type() {
return type;
}

public short unknownVersion() {
return unknownVersion;
}
}

/**
* Deserializes the key and the value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary, UnknownRecordTypeException}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.KafkaScheduler
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;

Expand All @@ -31,7 +30,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -40,7 +39,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
Expand Down Expand Up @@ -121,8 +121,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down Expand Up @@ -198,6 +198,34 @@ public void testDeserializeWithInvalidValueBytes() {
ex.getMessage());
}

@Test
public void testDeserializeWithInvalidValueVersion() {
GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();

ApiMessage key = new ConsumerGroupMetadataKey().setGroupId("foo");
ByteBuffer keyBuffer = MessageUtil.toCoordinatorTypePrefixedByteBuffer(key);

ByteBuffer valueBuffer1 = ByteBuffer.allocate(2);
valueBuffer1.putShort((short) (ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1));
valueBuffer1.rewind();

Deserializer.UnknownRecordVersionException ex =
assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer1));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1, ex.unknownVersion());

keyBuffer.rewind();
ByteBuffer valueBuffer2 = ByteBuffer.allocate(2);
valueBuffer2.putShort((short) (ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION - 1));
valueBuffer2.rewind();

ex = assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer2));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION - 1, ex.unknownVersion());
}

@Test
public void testDeserializeAllRecordTypes() {
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;

Expand All @@ -29,7 +28,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -38,7 +37,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
Expand Down Expand Up @@ -113,8 +113,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;

Expand All @@ -29,7 +28,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -38,7 +37,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
Expand Down Expand Up @@ -120,8 +120,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down