-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18672; CoordinatorRecordSerde must validate value version #18749
Conversation
if (valueVersion < valueMessage.lowestSupportedVersion() || valueVersion > valueMessage.highestSupportedVersion()) { | ||
throw new UnknownRecordVersionException(recordType, valueVersion); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UnknownRecordTypeException
is caught as the aborted upgrade might leave behind some "new" records. Should we also consider handling UnknownRecordVersionException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question. I think that we would need to do a new KIP or to extend KIP-915: Txn and Group Coordinator Downgrade Foundation for this. In KIP-915, we defined that using tagged fields should be the way forward to handle backward compatibility. Hence, I am not sure that ignore newer version is needed.
* UnknownRecordTypeException is thrown when the Deserializer encounters | ||
* an unknown record type. | ||
*/ | ||
class UnknownRecordTypeException extends RuntimeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this one from the Loader to here.
...inator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. thanks!
CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.
Committer Checklist (excluded from commit message)