diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java index 688a55abfd740..19055f00081e2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.UpdateRaftVoterRequest; import org.apache.kafka.common.requests.VoteRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.utils.ApiMessageUtils; import org.apache.kafka.server.util.InterBrokerSendThread; import org.apache.kafka.server.util.RequestAndCompletionHandler; @@ -156,7 +157,7 @@ private void sendOnComplete(RaftRequest.Outbound request, ClientResponse clientR private ApiMessage errorResponse(ApiMessage request, Errors error) { ApiKeys apiKey = ApiKeys.forId(request.apiKey()); - return RaftUtil.errorResponse(apiKey, error); + return ApiMessageUtils.parseErrorResponse(apiKey, error); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 538eec64e91f5..29550fccc469d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -80,6 +80,14 @@ import org.apache.kafka.raft.internals.RemoveVoterHandler; import org.apache.kafka.raft.internals.ThresholdPurgatory; import org.apache.kafka.raft.internals.UpdateVoterHandler; +import org.apache.kafka.raft.utils.ApiMessageUtils; +import org.apache.kafka.raft.utils.BeginQuorumEpochRpc; +import org.apache.kafka.raft.utils.DescribeQuorumRpc; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; +import org.apache.kafka.raft.utils.EndQuorumEpochRpc; +import org.apache.kafka.raft.utils.FetchRpc; +import org.apache.kafka.raft.utils.FetchSnapshotRpc; +import org.apache.kafka.raft.utils.VoteRpc; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; @@ -116,7 +124,6 @@ import java.util.stream.Collectors; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID; /** @@ -782,7 +789,7 @@ private VoteResponseData buildVoteResponse( Errors partitionLevelError, boolean voteGranted ) { - return RaftUtil.singletonVoteResponse( + return VoteRpc.singletonVoteResponse( listenerName, apiVersion, Errors.NONE, @@ -813,7 +820,7 @@ private VoteResponseData handleVoteRequest( return new VoteResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + if (!VoteRpc.hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat individual topic partition mismatches as invalid requests return new VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); } @@ -870,7 +877,7 @@ private VoteResponseData handleVoteRequest( } // Check that the request was intended for this replica - Optional voterKey = RaftUtil.voteRequestVoterKey(request, partitionRequest); + Optional voterKey = VoteRpc.voteRequestVoterKey(request, partitionRequest); if (!isValidVoterKey(voterKey)) { logger.info( "A replica sent a voter key ({}) in the VOTE request that doesn't match the " + @@ -940,7 +947,7 @@ private boolean handleVoteResponse( return handleTopLevelError(topLevelError, responseMetadata); } - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (!VoteRpc.hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -1067,7 +1074,7 @@ private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse( short apiVersion, Errors partitionLevelError ) { - return RaftUtil.singletonBeginQuorumEpochResponse( + return BeginQuorumEpochRpc.singletonBeginQuorumEpochResponse( listenerName, apiVersion, Errors.NONE, @@ -1097,7 +1104,7 @@ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest( return new BeginQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + if (!BeginQuorumEpochRpc.hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return new BeginQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); } @@ -1133,7 +1140,7 @@ private BeginQuorumEpochResponseData handleBeginQuorumEpochRequest( ); // Check that the request was intended for this replica - Optional voterKey = RaftUtil.beginQuorumEpochRequestVoterKey(request, partitionRequest); + Optional voterKey = BeginQuorumEpochRpc.beginQuorumEpochRequestVoterKey(request, partitionRequest); if (!isValidVoterKey(voterKey)) { logger.info( "Leader sent a voter key ({}) in the BEGIN_QUORUM_EPOCH request that doesn't " + @@ -1168,7 +1175,7 @@ private boolean handleBeginQuorumEpochResponse( return handleTopLevelError(topLevelError, responseMetadata); } - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (!BeginQuorumEpochRpc.hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -1223,7 +1230,7 @@ private EndQuorumEpochResponseData buildEndQuorumEpochResponse( short apiVersion, Errors partitionLevelError ) { - return RaftUtil.singletonEndQuorumEpochResponse( + return EndQuorumEpochRpc.singletonEndQuorumEpochResponse( listenerName, apiVersion, Errors.NONE, @@ -1253,7 +1260,7 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest( return new EndQuorumEpochResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()); } - if (!hasValidTopicPartition(request, log.topicPartition())) { + if (!EndQuorumEpochRpc.hasValidTopicPartition(request, log.topicPartition())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return new EndQuorumEpochResponseData().setErrorCode(Errors.INVALID_REQUEST.code()); } @@ -1344,7 +1351,7 @@ private boolean handleEndQuorumEpochResponse( return handleTopLevelError(topLevelError, responseMetadata); } - if (!hasValidTopicPartition(response, log.topicPartition())) { + if (!EndQuorumEpochRpc.hasValidTopicPartition(response, log.topicPartition())) { return false; } @@ -1397,7 +1404,7 @@ private FetchResponseData buildFetchResponse( ValidOffsetAndEpoch validOffsetAndEpoch, Optional highWatermark ) { - return RaftUtil.singletonFetchResponse( + return FetchRpc.singletonFetchResponse( listenerName, apiVersion, log.topicPartition(), @@ -1484,7 +1491,7 @@ private CompletableFuture handleFetchRequest( return completedFuture(new FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); } - if (!hasValidTopicPartition(request, log.topicPartition(), log.topicId())) { + if (!FetchRpc.hasValidTopicPartition(request, log.topicPartition(), log.topicId())) { // Until we support multi-raft, we treat topic partition mismatches as invalid requests return completedFuture(new FetchResponseData().setErrorCode(Errors.INVALID_REQUEST.code())); } @@ -1675,7 +1682,7 @@ private boolean handleFetchResponse( return handleTopLevelError(topLevelError, responseMetadata); } - if (!hasValidTopicPartition(response, log.topicPartition(), log.topicId())) { + if (!FetchRpc.hasValidTopicPartition(response, log.topicPartition(), log.topicId())) { return false; } // If the ID is valid, we can set the topic name. @@ -1841,7 +1848,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( long currentTimeMs ) { DescribeQuorumRequestData describeQuorumRequestData = (DescribeQuorumRequestData) requestMetadata.data(); - if (!hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) { + if (!DescribeQuorumRpc.hasValidTopicPartition(describeQuorumRequestData, log.topicPartition())) { return DescribeQuorumRequest.getPartitionLevelErrorResponse( describeQuorumRequestData, Errors.UNKNOWN_TOPIC_OR_PARTITION @@ -1856,7 +1863,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( } LeaderState leaderState = quorum.leaderStateOrThrow(); - return RaftUtil.singletonDescribeQuorumResponse( + return DescribeQuorumRpc.singletonDescribeQuorumResponse( requestMetadata.apiVersion(), log.topicPartition(), quorum.localIdOrThrow(), @@ -1906,7 +1913,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( data.topics().get(0).partitions().get(0).partition() ); - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), unknownTopicPartition, @@ -1922,7 +1929,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( partitionSnapshot.currentLeaderEpoch() ); if (leaderValidation.isPresent()) { - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), log.topicPartition(), @@ -1943,7 +1950,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( // The bootstrap checkpoint should not be replicated. The first leader will // make sure that the content of the bootstrap checkpoint is included in the // partition log - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), log.topicPartition(), @@ -1957,7 +1964,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( RawSnapshotReader snapshot = snapshotOpt.get(); long snapshotSize = snapshot.sizeInBytes(); if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshotSize) { - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), log.topicPartition(), @@ -1997,7 +2004,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( currentTimeMs ); - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( requestMetadata.listenerName(), requestMetadata.apiVersion(), log.topicPartition(), @@ -2208,7 +2215,7 @@ private CompletableFuture handleAddVoterRequest( ); } - Optional newVoter = RaftUtil.addVoterRequestVoterKey(data); + Optional newVoter = DynamicReconfigRpc.addVoterRequestVoterKey(data); if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) { return completedFuture( new AddRaftVoterResponseData() @@ -2291,7 +2298,7 @@ private CompletableFuture handleRemoveVoterRequest( ); } - Optional oldVoter = RaftUtil.removeVoterRequestVoterKey(data); + Optional oldVoter = DynamicReconfigRpc.removeVoterRequestVoterKey(data); if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) { return completedFuture( new RemoveRaftVoterResponseData() @@ -2315,7 +2322,7 @@ private CompletableFuture handleUpdateVoterRequest( if (!hasValidClusterId(data.clusterId())) { return completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INCONSISTENT_CLUSTER_ID, requestMetadata.listenerName(), quorum.leaderAndEpoch(), @@ -2327,7 +2334,7 @@ private CompletableFuture handleUpdateVoterRequest( Optional leaderValidationError = validateLeaderOnlyRequest(data.currentLeaderEpoch()); if (leaderValidationError.isPresent()) { return completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( leaderValidationError.get(), requestMetadata.listenerName(), quorum.leaderAndEpoch(), @@ -2336,10 +2343,10 @@ private CompletableFuture handleUpdateVoterRequest( ); } - Optional voter = RaftUtil.updateVoterRequestVoterKey(data); + Optional voter = DynamicReconfigRpc.updateVoterRequestVoterKey(data); if (voter.isEmpty() || voter.get().directoryId().isEmpty()) { return completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INVALID_REQUEST, requestMetadata.listenerName(), quorum.leaderAndEpoch(), @@ -2351,7 +2358,7 @@ private CompletableFuture handleUpdateVoterRequest( Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners()); if (voterEndpoints.address(channel.listenerName()).isEmpty()) { return completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INVALID_REQUEST, requestMetadata.listenerName(), quorum.leaderAndEpoch(), @@ -2366,7 +2373,7 @@ private CompletableFuture handleUpdateVoterRequest( supportedKraftVersions.maxSupportedVersion() < supportedKraftVersions.minSupportedVersion() ) { return completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INVALID_REQUEST, requestMetadata.listenerName(), quorum.leaderAndEpoch(), @@ -2703,7 +2710,7 @@ private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) { responseFuture.whenComplete((response, exception) -> { ApiMessage message = response; if (message == null) { - message = RaftUtil.errorResponse(apiKey, Errors.forException(exception)); + message = ApiMessageUtils.parseErrorResponse(apiKey, Errors.forException(exception)); } RaftResponse.Outbound responseMessage = new RaftResponse.Outbound(request.correlationId(), message); @@ -2757,7 +2764,7 @@ private long maybeSendRequest( if (exception != null) { ApiKeys api = ApiKeys.forId(request.apiKey()); Errors error = Errors.forException(exception); - ApiMessage errorResponse = RaftUtil.errorResponse(api, error); + ApiMessage errorResponse = ApiMessageUtils.parseErrorResponse(api, error); response = new RaftResponse.Inbound( correlationId, @@ -2780,7 +2787,7 @@ private long maybeSendRequest( private EndQuorumEpochRequestData buildEndQuorumEpochRequest( ResignedState state ) { - return RaftUtil.singletonEndQuorumEpochRequest( + return EndQuorumEpochRpc.singletonEndQuorumEpochRequest( log.topicPartition(), clusterId, quorum.epoch(), @@ -2823,7 +2830,7 @@ private long maybeSendRequest( } private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest(ReplicaKey remoteVoter) { - return RaftUtil.singletonBeginQuorumEpochRequest( + return BeginQuorumEpochRpc.singletonBeginQuorumEpochRequest( log.topicPartition(), clusterId, quorum.epoch(), @@ -2835,7 +2842,7 @@ private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest(ReplicaKey remo private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter, boolean preVote) { OffsetAndEpoch endOffset = endOffset(); - return RaftUtil.singletonVoteRequest( + return VoteRpc.singletonVoteRequest( log.topicPartition(), clusterId, quorum.epoch(), @@ -2848,7 +2855,7 @@ private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter, boolean preVote } private FetchRequestData buildFetchRequest() { - FetchRequestData request = RaftUtil.singletonFetchRequest( + FetchRequestData request = FetchRpc.singletonFetchRequest( log.topicPartition(), log.topicId(), fetchPartition -> fetchPartition @@ -2879,7 +2886,7 @@ private long maybeSendFetchToAnyBootstrap(long currentTimeMs) { } private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapshotId, long snapshotSize) { - return RaftUtil.singletonFetchSnapshotRequest( + return FetchSnapshotRpc.singletonFetchSnapshotRequest( clusterId, ReplicaKey.of(quorum().localIdOrSentinel(), quorum.localDirectoryId()), log.topicPartition(), @@ -3247,7 +3254,7 @@ private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTime } private UpdateRaftVoterRequestData buildUpdateVoterRequest() { - return RaftUtil.updateVoterRequest( + return DynamicReconfigRpc.updateVoterRequest( clusterId, quorum.localReplicaKeyOrThrow(), quorum.epoch(), diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 43cc1417d3cf3..d527ae02110fe 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -31,6 +31,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.KafkaRaftMetrics; import org.apache.kafka.raft.internals.RemoveVoterHandlerState; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; import org.apache.kafka.server.common.KRaftVersion; import org.slf4j.Logger; @@ -223,7 +224,7 @@ public void resetAddVoterHandlerState( addVoterHandlerState.ifPresent( handlerState -> handlerState .future() - .complete(RaftUtil.addVoterResponse(error, message)) + .complete(DynamicReconfigRpc.addVoterResponse(error, message)) ); addVoterHandlerState = state; updateUncommittedVoterChangeMetric(); @@ -241,7 +242,7 @@ public void resetRemoveVoterHandlerState( removeVoterHandlerState.ifPresent( handlerState -> handlerState .future() - .complete(RaftUtil.removeVoterResponse(error, message)) + .complete(DynamicReconfigRpc.removeVoterResponse(error, message)) ); removeVoterHandlerState = state; updateUncommittedVoterChangeMetric(); @@ -386,6 +387,7 @@ public void appendStartOfEpochControlRecords(VoterSet.VoterNode localVoterNode, return builder.build(); } }); + accumulator.forceDrain(); } public long appendVotersRecord(VoterSet voters, long currentTimeMs) { diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java deleted file mode 100644 index 12c48955b39b7..0000000000000 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ /dev/null @@ -1,768 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.feature.SupportedVersionRange; -import org.apache.kafka.common.message.AddRaftVoterRequestData; -import org.apache.kafka.common.message.AddRaftVoterResponseData; -import org.apache.kafka.common.message.ApiVersionsResponseData; -import org.apache.kafka.common.message.BeginQuorumEpochRequestData; -import org.apache.kafka.common.message.BeginQuorumEpochResponseData; -import org.apache.kafka.common.message.DescribeQuorumRequestData; -import org.apache.kafka.common.message.DescribeQuorumResponseData; -import org.apache.kafka.common.message.EndQuorumEpochRequestData; -import org.apache.kafka.common.message.EndQuorumEpochResponseData; -import org.apache.kafka.common.message.FetchRequestData; -import org.apache.kafka.common.message.FetchResponseData; -import org.apache.kafka.common.message.FetchSnapshotRequestData; -import org.apache.kafka.common.message.FetchSnapshotResponseData; -import org.apache.kafka.common.message.RemoveRaftVoterRequestData; -import org.apache.kafka.common.message.RemoveRaftVoterResponseData; -import org.apache.kafka.common.message.UpdateRaftVoterRequestData; -import org.apache.kafka.common.message.UpdateRaftVoterResponseData; -import org.apache.kafka.common.message.VoteRequestData; -import org.apache.kafka.common.message.VoteResponseData; -import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.common.protocol.Errors; - -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.function.Consumer; -import java.util.function.UnaryOperator; -import java.util.stream.Collectors; - -@SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity" }) -public class RaftUtil { - - public static ApiMessage errorResponse(ApiKeys apiKey, Errors error) { - switch (apiKey) { - case VOTE: - return new VoteResponseData().setErrorCode(error.code()); - case BEGIN_QUORUM_EPOCH: - return new BeginQuorumEpochResponseData().setErrorCode(error.code()); - case END_QUORUM_EPOCH: - return new EndQuorumEpochResponseData().setErrorCode(error.code()); - case FETCH: - return new FetchResponseData().setErrorCode(error.code()); - case FETCH_SNAPSHOT: - return new FetchSnapshotResponseData().setErrorCode(error.code()); - case API_VERSIONS: - return new ApiVersionsResponseData().setErrorCode(error.code()); - case UPDATE_RAFT_VOTER: - return new UpdateRaftVoterResponseData().setErrorCode(error.code()); - default: - throw new IllegalArgumentException("Received response for unexpected request type: " + apiKey); - } - } - - public static FetchRequestData singletonFetchRequest( - TopicPartition topicPartition, - Uuid topicId, - Consumer partitionConsumer - ) { - FetchRequestData.FetchPartition fetchPartition = - new FetchRequestData.FetchPartition() - .setPartition(topicPartition.partition()); - partitionConsumer.accept(fetchPartition); - - FetchRequestData.FetchTopic fetchTopic = - new FetchRequestData.FetchTopic() - .setTopic(topicPartition.topic()) - .setTopicId(topicId) - .setPartitions(Collections.singletonList(fetchPartition)); - - return new FetchRequestData() - .setTopics(Collections.singletonList(fetchTopic)); - } - - public static FetchResponseData singletonFetchResponse( - ListenerName listenerName, - short apiVersion, - TopicPartition topicPartition, - Uuid topicId, - Errors topLevelError, - int leaderId, - Endpoints endpoints, - Consumer partitionConsumer - ) { - FetchResponseData.PartitionData fetchablePartition = - new FetchResponseData.PartitionData(); - - fetchablePartition.setPartitionIndex(topicPartition.partition()); - - partitionConsumer.accept(fetchablePartition); - - FetchResponseData.FetchableTopicResponse fetchableTopic = - new FetchResponseData.FetchableTopicResponse() - .setTopic(topicPartition.topic()) - .setTopicId(topicId) - .setPartitions(Collections.singletonList(fetchablePartition)); - - FetchResponseData response = new FetchResponseData(); - - if (apiVersion >= 17) { - Optional address = endpoints.address(listenerName); - if (address.isPresent() && leaderId >= 0) { - // Populate the node endpoints - FetchResponseData.NodeEndpointCollection nodeEndpoints = new FetchResponseData.NodeEndpointCollection(1); - nodeEndpoints.add( - new FetchResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) - ); - response.setNodeEndpoints(nodeEndpoints); - } - } - - return response - .setErrorCode(topLevelError.code()) - .setResponses(Collections.singletonList(fetchableTopic)); - } - - public static VoteRequestData singletonVoteRequest( - TopicPartition topicPartition, - String clusterId, - int replicaEpoch, - ReplicaKey replicaKey, - ReplicaKey voterKey, - int lastEpoch, - long lastEpochEndOffset, - boolean preVote - ) { - return new VoteRequestData() - .setClusterId(clusterId) - .setVoterId(voterKey.id()) - .setTopics( - Collections.singletonList( - new VoteRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new VoteRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setReplicaEpoch(replicaEpoch) - .setReplicaId(replicaKey.id()) - .setReplicaDirectoryId( - replicaKey - .directoryId() - .orElse(ReplicaKey.NO_DIRECTORY_ID) - ) - .setVoterDirectoryId( - voterKey - .directoryId() - .orElse(ReplicaKey.NO_DIRECTORY_ID) - ) - .setLastOffsetEpoch(lastEpoch) - .setLastOffset(lastEpochEndOffset) - .setPreVote(preVote) - ) - ) - ) - ); - } - - public static VoteResponseData singletonVoteResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - boolean voteGranted, - Endpoints endpoints - ) { - VoteResponseData.PartitionData partitionData = new VoteResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - .setVoteGranted(voteGranted); - - VoteResponseData response = new VoteResponseData() - .setErrorCode(topLevelError.code()) - .setTopics(Collections.singletonList( - new VoteResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionData)))); - - if (apiVersion >= 1) { - Optional address = endpoints.address(listenerName); - if (address.isPresent() && leaderId >= 0) { - // Populate the node endpoints - VoteResponseData.NodeEndpointCollection nodeEndpoints = new VoteResponseData.NodeEndpointCollection(1); - nodeEndpoints.add( - new VoteResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) - ); - response.setNodeEndpoints(nodeEndpoints); - } - } - - return response; - } - - public static FetchSnapshotRequestData singletonFetchSnapshotRequest( - String clusterId, - ReplicaKey replicaKey, - TopicPartition topicPartition, - int epoch, - OffsetAndEpoch offsetAndEpoch, - int maxBytes, - long position - ) { - FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId() - .setEndOffset(offsetAndEpoch.offset()) - .setEpoch(offsetAndEpoch.epoch()); - - FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = new FetchSnapshotRequestData.PartitionSnapshot() - .setPartition(topicPartition.partition()) - .setCurrentLeaderEpoch(epoch) - .setSnapshotId(snapshotId) - .setPosition(position) - .setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); - - return new FetchSnapshotRequestData() - .setClusterId(clusterId) - .setReplicaId(replicaKey.id()) - .setMaxBytes(maxBytes) - .setTopics( - Collections.singletonList( - new FetchSnapshotRequestData.TopicSnapshot() - .setName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionSnapshot)) - ) - ); - } - - /** - * Creates a FetchSnapshotResponseData with a single PartitionSnapshot for the topic partition. - * - * The partition index will already be populated when calling operator. - * - * @param listenerName the listener used to accept the request - * @param apiVersion the api version of the request - * @param topicPartition the topic partition to include - * @param leaderId the id of the leader - * @param endpoints the endpoints of the leader - * @param operator unary operator responsible for populating all of the appropriate fields - * @return the created fetch snapshot response data - */ - public static FetchSnapshotResponseData singletonFetchSnapshotResponse( - ListenerName listenerName, - short apiVersion, - TopicPartition topicPartition, - int leaderId, - Endpoints endpoints, - UnaryOperator operator - ) { - FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = operator.apply( - new FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition()) - ); - - FetchSnapshotResponseData response = new FetchSnapshotResponseData() - .setTopics( - Collections.singletonList( - new FetchSnapshotResponseData.TopicSnapshot() - .setName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionSnapshot)) - ) - ); - - if (apiVersion >= 1) { - Optional address = endpoints.address(listenerName); - if (address.isPresent() && leaderId >= 0) { - // Populate the node endpoints - FetchSnapshotResponseData.NodeEndpointCollection nodeEndpoints = - new FetchSnapshotResponseData.NodeEndpointCollection(1); - nodeEndpoints.add( - new FetchSnapshotResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) - ); - response.setNodeEndpoints(nodeEndpoints); - } - } - - return response; - } - - public static BeginQuorumEpochRequestData singletonBeginQuorumEpochRequest( - TopicPartition topicPartition, - String clusterId, - int leaderEpoch, - int leaderId, - Endpoints leaderEndpoints, - ReplicaKey voterKey - ) { - return new BeginQuorumEpochRequestData() - .setClusterId(clusterId) - .setVoterId(voterKey.id()) - .setTopics( - Collections.singletonList( - new BeginQuorumEpochRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new BeginQuorumEpochRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setLeaderEpoch(leaderEpoch) - .setLeaderId(leaderId) - .setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - ) - ) - ) - ) - .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest()); - } - - public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - Endpoints endpoints - ) { - BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData() - .setErrorCode(topLevelError.code()) - .setTopics( - Collections.singletonList( - new BeginQuorumEpochResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new BeginQuorumEpochResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - ) - ) - ) - ); - - if (apiVersion >= 1) { - Optional address = endpoints.address(listenerName); - if (address.isPresent() && leaderId >= 0) { - // Populate the node endpoints - BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = - new BeginQuorumEpochResponseData.NodeEndpointCollection(1); - nodeEndpoints.add( - new BeginQuorumEpochResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) - ); - response.setNodeEndpoints(nodeEndpoints); - } - } - - return response; - } - - public static EndQuorumEpochRequestData singletonEndQuorumEpochRequest( - TopicPartition topicPartition, - String clusterId, - int leaderEpoch, - int leaderId, - List preferredReplicaKeys - ) { - List preferredSuccessors = preferredReplicaKeys - .stream() - .map(ReplicaKey::id) - .collect(Collectors.toList()); - - List preferredCandidates = preferredReplicaKeys - .stream() - .map(replicaKey -> new EndQuorumEpochRequestData.ReplicaInfo() - .setCandidateId(replicaKey.id()) - .setCandidateDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - ) - .collect(Collectors.toList()); - - return new EndQuorumEpochRequestData() - .setClusterId(clusterId) - .setTopics( - Collections.singletonList( - new EndQuorumEpochRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new EndQuorumEpochRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setLeaderEpoch(leaderEpoch) - .setLeaderId(leaderId) - .setPreferredSuccessors(preferredSuccessors) - .setPreferredCandidates(preferredCandidates) - ) - ) - ) - ); - - } - - public static EndQuorumEpochResponseData singletonEndQuorumEpochResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - Endpoints endpoints - ) { - EndQuorumEpochResponseData response = new EndQuorumEpochResponseData() - .setErrorCode(topLevelError.code()) - .setTopics(Collections.singletonList( - new EndQuorumEpochResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( - new EndQuorumEpochResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - ))) - ); - - if (apiVersion >= 1) { - Optional address = endpoints.address(listenerName); - if (address.isPresent() && leaderId >= 0) { - // Populate the node endpoints - EndQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = - new EndQuorumEpochResponseData.NodeEndpointCollection(1); - nodeEndpoints.add( - new EndQuorumEpochResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) - ); - response.setNodeEndpoints(nodeEndpoints); - } - } - - return response; - } - - - public static DescribeQuorumRequestData singletonDescribeQuorumRequest( - TopicPartition topicPartition - ) { - - return new DescribeQuorumRequestData() - .setTopics( - Collections.singletonList( - new DescribeQuorumRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new DescribeQuorumRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - ) - ) - ) - ); - } - - public static DescribeQuorumResponseData singletonDescribeQuorumResponse( - short apiVersion, - TopicPartition topicPartition, - int leaderId, - int leaderEpoch, - long highWatermark, - Collection voters, - Collection observers, - long currentTimeMs - ) { - DescribeQuorumResponseData response = new DescribeQuorumResponseData() - .setTopics( - Collections.singletonList( - new DescribeQuorumResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new DescribeQuorumResponseData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setErrorCode(Errors.NONE.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - .setHighWatermark(highWatermark) - .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) - .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)))))); - if (apiVersion >= 2) { - DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size()); - for (LeaderState.ReplicaState voter : voters) { - nodes.add( - new DescribeQuorumResponseData.Node() - .setNodeId(voter.replicaKey().id()) - .setListeners(voter.listeners().toDescribeQuorumResponseListeners()) - ); - } - response.setNodes(nodes); - } - return response; - } - - public static AddRaftVoterRequestData addVoterRequest( - String clusterId, - int timeoutMs, - ReplicaKey voter, - Endpoints listeners - ) { - return new AddRaftVoterRequestData() - .setClusterId(clusterId) - .setTimeoutMs(timeoutMs) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - .setListeners(listeners.toAddVoterRequest()); - } - - public static AddRaftVoterResponseData addVoterResponse( - Errors error, - String errorMessage - ) { - errorMessage = errorMessage == null ? error.message() : errorMessage; - - return new AddRaftVoterResponseData() - .setErrorCode(error.code()) - .setErrorMessage(errorMessage); - } - - public static RemoveRaftVoterRequestData removeVoterRequest( - String clusterId, - ReplicaKey voter - ) { - return new RemoveRaftVoterRequestData() - .setClusterId(clusterId) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); - } - - public static RemoveRaftVoterResponseData removeVoterResponse( - Errors error, - String errorMessage - ) { - errorMessage = errorMessage == null ? error.message() : errorMessage; - - return new RemoveRaftVoterResponseData() - .setErrorCode(error.code()) - .setErrorMessage(errorMessage); - } - - public static UpdateRaftVoterRequestData updateVoterRequest( - String clusterId, - ReplicaKey voter, - int epoch, - SupportedVersionRange supportedVersions, - Endpoints endpoints - ) { - UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData() - .setClusterId(clusterId) - .setCurrentLeaderEpoch(epoch) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - .setListeners(endpoints.toUpdateVoterRequest()); - - request.kRaftVersionFeature() - .setMinSupportedVersion(supportedVersions.min()) - .setMaxSupportedVersion(supportedVersions.max()); - - return request; - } - - public static UpdateRaftVoterResponseData updateVoterResponse( - Errors error, - ListenerName listenerName, - LeaderAndEpoch leaderAndEpoch, - Endpoints endpoints - ) { - UpdateRaftVoterResponseData response = new UpdateRaftVoterResponseData() - .setErrorCode(error.code()); - - response.currentLeader() - .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) - .setLeaderEpoch(leaderAndEpoch.epoch()); - - Optional address = endpoints.address(listenerName); - if (address.isPresent()) { - response.currentLeader() - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()); - } - - return response; - } - - private static List toReplicaStates( - short apiVersion, - int leaderId, - Collection states, - long currentTimeMs - ) { - return states - .stream() - .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) - .collect(Collectors.toList()); - } - - private static DescribeQuorumResponseData.ReplicaState toReplicaState( - short apiVersion, - int leaderId, - LeaderState.ReplicaState replicaState, - long currentTimeMs - ) { - final long lastCaughtUpTimestamp; - final long lastFetchTimestamp; - if (replicaState.replicaKey().id() == leaderId) { - lastCaughtUpTimestamp = currentTimeMs; - lastFetchTimestamp = currentTimeMs; - } else { - lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp(); - lastFetchTimestamp = replicaState.lastFetchTimestamp(); - } - DescribeQuorumResponseData.ReplicaState replicaStateData = new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(replicaState.replicaKey().id()) - .setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L)) - .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) - .setLastFetchTimestamp(lastFetchTimestamp); - - if (apiVersion >= 2) { - replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); - } - return replicaStateData; - } - - public static Optional voteRequestVoterKey( - VoteRequestData request, - VoteRequestData.PartitionData partition - ) { - if (request.voterId() < 0) { - return Optional.empty(); - } else { - return Optional.of(ReplicaKey.of(request.voterId(), partition.voterDirectoryId())); - } - } - - public static Optional beginQuorumEpochRequestVoterKey( - BeginQuorumEpochRequestData request, - BeginQuorumEpochRequestData.PartitionData partition - ) { - if (request.voterId() < 0) { - return Optional.empty(); - } else { - return Optional.of(ReplicaKey.of(request.voterId(), partition.voterDirectoryId())); - } - } - - public static Optional addVoterRequestVoterKey(AddRaftVoterRequestData request) { - if (request.voterId() < 0) { - return Optional.empty(); - } else { - return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); - } - } - - public static Optional removeVoterRequestVoterKey(RemoveRaftVoterRequestData request) { - if (request.voterId() < 0) { - return Optional.empty(); - } else { - return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); - } - } - - public static Optional updateVoterRequestVoterKey(UpdateRaftVoterRequestData request) { - if (request.voterId() < 0) { - return Optional.empty(); - } else { - return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); - } - } - - static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { - return data.topics().size() == 1 && - data.topics().get(0).topicId().equals(topicId) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) { - return data.responses().size() == 1 && - data.responses().get(0).topicId().equals(topicId) && - data.responses().get(0).partitions().size() == 1 && - data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(VoteResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(VoteRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } - - static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) { - return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); - } -} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java index 1f7ea2f61c47b..7d512eb0174a9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java @@ -28,9 +28,9 @@ import org.apache.kafka.raft.Endpoints; import org.apache.kafka.raft.LeaderState; import org.apache.kafka.raft.LogOffsetMetadata; -import org.apache.kafka.raft.RaftUtil; import org.apache.kafka.raft.ReplicaKey; import org.apache.kafka.raft.VoterSet; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; import org.apache.kafka.server.common.KRaftVersion; import org.slf4j.Logger; @@ -92,7 +92,7 @@ public CompletableFuture handleAddVoterRequest( // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.REQUEST_TIMED_OUT, "Request timed out waiting for leader to handle previous voter change request" ) @@ -103,7 +103,7 @@ public CompletableFuture handleAddVoterRequest( Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); if (highWatermark.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.REQUEST_TIMED_OUT, "Request timed out waiting for leader to establish HWM and fence previous voter changes" ) @@ -114,7 +114,7 @@ public CompletableFuture handleAddVoterRequest( KRaftVersion kraftVersion = partitionState.lastKraftVersion(); if (!kraftVersion.isReconfigSupported()) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.UNSUPPORTED_VERSION, String.format( "Cluster doesn't support adding voter because the %s feature is %s", @@ -129,7 +129,7 @@ public CompletableFuture handleAddVoterRequest( Optional> votersEntry = partitionState.lastVoterSetEntry(); if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.REQUEST_TIMED_OUT, String.format( "Request timed out waiting for voters to commit the latest voter change at %s with HWM %d", @@ -144,7 +144,7 @@ public CompletableFuture handleAddVoterRequest( VoterSet voters = votersEntry.get().value(); if (voters.voterIds().contains(voterKey.id())) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.DUPLICATE_VOTER, String.format( "The voter id for %s is already part of the set of voters %s.", @@ -174,7 +174,7 @@ public CompletableFuture handleAddVoterRequest( ); if (timeout.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.addVoterResponse( + DynamicReconfigRpc.addVoterResponse( Errors.REQUEST_TIMED_OUT, String.format("New voter %s is not ready to receive requests", voterKey) ) diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java index 0cee3c255d1b0..55b197255bff8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java @@ -26,8 +26,8 @@ import org.apache.kafka.raft.RaftMessageQueue; import org.apache.kafka.raft.RaftRequest; import org.apache.kafka.raft.RaftResponse; -import org.apache.kafka.raft.RaftUtil; import org.apache.kafka.raft.RequestManager; +import org.apache.kafka.raft.utils.ApiMessageUtils; import org.slf4j.Logger; @@ -89,7 +89,7 @@ public OptionalLong send( if (exception != null) { ApiKeys api = ApiKeys.forId(request.apiKey()); Errors error = Errors.forException(exception); - ApiMessage errorResponse = RaftUtil.errorResponse(api, error); + ApiMessage errorResponse = ApiMessageUtils.parseErrorResponse(api, error); response = new RaftResponse.Inbound( correlationId, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java index 2dea86d593bfe..320e7c982ef7c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java @@ -23,9 +23,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.LeaderState; import org.apache.kafka.raft.LogOffsetMetadata; -import org.apache.kafka.raft.RaftUtil; import org.apache.kafka.raft.ReplicaKey; import org.apache.kafka.raft.VoterSet; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; import org.apache.kafka.server.common.KRaftVersion; import org.slf4j.Logger; @@ -82,7 +82,7 @@ public CompletableFuture handleRemoveVoterRequest( // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( - RaftUtil.removeVoterResponse( + DynamicReconfigRpc.removeVoterResponse( Errors.REQUEST_TIMED_OUT, "Request timed out waiting for leader to handle previous voter change request" ) @@ -93,7 +93,7 @@ public CompletableFuture handleRemoveVoterRequest( Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); if (highWatermark.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.removeVoterResponse( + DynamicReconfigRpc.removeVoterResponse( Errors.REQUEST_TIMED_OUT, "Request timed out waiting for leader to establish HWM and fence previous voter changes" ) @@ -104,7 +104,7 @@ public CompletableFuture handleRemoveVoterRequest( KRaftVersion kraftVersion = partitionState.lastKraftVersion(); if (!kraftVersion.isReconfigSupported()) { return CompletableFuture.completedFuture( - RaftUtil.removeVoterResponse( + DynamicReconfigRpc.removeVoterResponse( Errors.UNSUPPORTED_VERSION, String.format( "Cluster doesn't support removing voter because the %s feature is %s", @@ -119,7 +119,7 @@ public CompletableFuture handleRemoveVoterRequest( Optional> votersEntry = partitionState.lastVoterSetEntry(); if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { return CompletableFuture.completedFuture( - RaftUtil.removeVoterResponse( + DynamicReconfigRpc.removeVoterResponse( Errors.REQUEST_TIMED_OUT, String.format( "Request timed out waiting for voters to commit the latest voter change at %s with HWM %d", @@ -134,7 +134,7 @@ public CompletableFuture handleRemoveVoterRequest( Optional newVoters = votersEntry.get().value().removeVoter(voterKey); if (newVoters.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.removeVoterResponse( + DynamicReconfigRpc.removeVoterResponse( Errors.VOTER_NOT_FOUND, String.format( "Cannot remove voter %s from the set of voters %s", diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java index 335e1b02a22c1..2137d64211c7a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java @@ -26,9 +26,9 @@ import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.raft.LeaderState; import org.apache.kafka.raft.LogOffsetMetadata; -import org.apache.kafka.raft.RaftUtil; import org.apache.kafka.raft.ReplicaKey; import org.apache.kafka.raft.VoterSet; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; import org.apache.kafka.server.common.KRaftVersion; import java.util.Optional; @@ -83,7 +83,7 @@ public CompletableFuture handleUpdateVoterRequest( // Check if there are any pending voter change requests if (leaderState.isOperationPending(currentTimeMs)) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.REQUEST_TIMED_OUT, requestListenerName, new LeaderAndEpoch( @@ -99,7 +99,7 @@ public CompletableFuture handleUpdateVoterRequest( Optional highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); if (highWatermark.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.REQUEST_TIMED_OUT, requestListenerName, new LeaderAndEpoch( @@ -116,7 +116,7 @@ public CompletableFuture handleUpdateVoterRequest( KRaftVersion kraftVersion = partitionState.lastKraftVersion(); if (!kraftVersion.isReconfigSupported()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.UNSUPPORTED_VERSION, requestListenerName, new LeaderAndEpoch( @@ -132,7 +132,7 @@ public CompletableFuture handleUpdateVoterRequest( Optional> votersEntry = partitionState.lastVoterSetEntry(); if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.REQUEST_TIMED_OUT, requestListenerName, new LeaderAndEpoch( @@ -147,7 +147,7 @@ public CompletableFuture handleUpdateVoterRequest( // Check that the supported version range is valid if (!validVersionRange(kraftVersion, supportedKraftVersions)) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INVALID_REQUEST, requestListenerName, new LeaderAndEpoch( @@ -162,7 +162,7 @@ public CompletableFuture handleUpdateVoterRequest( // Check that endpoinds includes the default listener if (voterEndpoints.address(defaultListenerName).isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.INVALID_REQUEST, requestListenerName, new LeaderAndEpoch( @@ -190,7 +190,7 @@ public CompletableFuture handleUpdateVoterRequest( ); if (updatedVoters.isEmpty()) { return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.VOTER_NOT_FOUND, requestListenerName, new LeaderAndEpoch( @@ -206,7 +206,7 @@ public CompletableFuture handleUpdateVoterRequest( // Reply immediately and don't wait for the change to commit return CompletableFuture.completedFuture( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( Errors.NONE, requestListenerName, new LeaderAndEpoch( diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java index c0ac6c5189983..e62f29f2a02bc 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Timer; import org.apache.kafka.raft.Endpoints; import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.RaftUtil; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; import java.util.concurrent.CompletableFuture; @@ -57,7 +57,7 @@ public void completeFuture( Endpoints leaderEndpoints ) { future.complete( - RaftUtil.updateVoterResponse( + DynamicReconfigRpc.updateVoterResponse( error, requestListenerName, leaderAndEpoch, diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/ApiMessageUtils.java b/raft/src/main/java/org/apache/kafka/raft/utils/ApiMessageUtils.java new file mode 100644 index 0000000000000..7807ce3c6918c --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/ApiMessageUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.message.ApiVersionsResponseData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; + +public class ApiMessageUtils { + public static ApiMessage parseErrorResponse(ApiKeys apiKey, Errors error) { + return switch (apiKey) { + case VOTE -> new VoteResponseData().setErrorCode(error.code()); + case BEGIN_QUORUM_EPOCH -> new BeginQuorumEpochResponseData().setErrorCode(error.code()); + case END_QUORUM_EPOCH -> new EndQuorumEpochResponseData().setErrorCode(error.code()); + case FETCH -> new FetchResponseData().setErrorCode(error.code()); + case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code()); + case API_VERSIONS -> new ApiVersionsResponseData().setErrorCode(error.code()); + case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code()); + default -> throw new IllegalArgumentException("Received response for unexpected request type: " + apiKey); + }; + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java new file mode 100644 index 0000000000000..b09d4e4677f8c --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.BeginQuorumEpochRequestData; +import org.apache.kafka.common.message.BeginQuorumEpochResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Optional; + + +public class BeginQuorumEpochRpc { + public static BeginQuorumEpochRequestData singletonBeginQuorumEpochRequest( + TopicPartition topicPartition, + String clusterId, + int leaderEpoch, + int leaderId, + Endpoints leaderEndpoints, + ReplicaKey voterKey + ) { + return new BeginQuorumEpochRequestData() + .setClusterId(clusterId) + .setVoterId(voterKey.id()) + .setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new BeginQuorumEpochRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setLeaderEpoch(leaderEpoch) + .setLeaderId(leaderId) + .setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + ) + ) + ) + ) + .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest()); + } + + public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse( + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + Endpoints endpoints + ) { + BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData() + .setErrorCode(topLevelError.code()) + .setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new BeginQuorumEpochResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + ) + ) + ) + ); + + if (apiVersion >= 1) { + Optional address = endpoints.address(listenerName); + if (address.isPresent() && leaderId >= 0) { + // Populate the node endpoints + BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = + new BeginQuorumEpochResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new BeginQuorumEpochResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) + ); + response.setNodeEndpoints(nodeEndpoints); + } + } + + return response; + } + + public static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } + + public static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } + + public static Optional beginQuorumEpochRequestVoterKey( + BeginQuorumEpochRequestData request, + BeginQuorumEpochRequestData.PartitionData partition + ) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), partition.voterDirectoryId())); + } + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java new file mode 100644 index 0000000000000..197302ab7733d --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeQuorumRequestData; +import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.LeaderState; +import org.apache.kafka.raft.LogOffsetMetadata; +import org.apache.kafka.raft.ReplicaKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class DescribeQuorumRpc { + public static DescribeQuorumRequestData singletonDescribeQuorumRequest( + TopicPartition topicPartition + ) { + return new DescribeQuorumRequestData() + .setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new DescribeQuorumRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + ) + ) + ) + ); + } + + public static DescribeQuorumResponseData singletonDescribeQuorumResponse( + short apiVersion, + TopicPartition topicPartition, + int leaderId, + int leaderEpoch, + long highWatermark, + Collection voters, + Collection observers, + long currentTimeMs + ) { + DescribeQuorumResponseData response = new DescribeQuorumResponseData() + .setTopics( + Collections.singletonList( + new DescribeQuorumResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new DescribeQuorumResponseData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(Errors.NONE.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setHighWatermark(highWatermark) + .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) + .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)) + ) + ) + ) + ); + + if (apiVersion >= 2) { + DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size()); + for (LeaderState.ReplicaState voter : voters) { + nodes.add( + new DescribeQuorumResponseData.Node() + .setNodeId(voter.replicaKey().id()) + .setListeners(voter.listeners().toDescribeQuorumResponseListeners()) + ); + } + response.setNodes(nodes); + } + return response; + } + + private static List toReplicaStates( + short apiVersion, + int leaderId, + Collection states, + long currentTimeMs + ) { + return states + .stream() + .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) + .collect(Collectors.toList()); + } + + private static DescribeQuorumResponseData.ReplicaState toReplicaState( + short apiVersion, + int leaderId, + LeaderState.ReplicaState replicaState, + long currentTimeMs + ) { + final long lastCaughtUpTimestamp; + final long lastFetchTimestamp; + if (replicaState.replicaKey().id() == leaderId) { + lastCaughtUpTimestamp = currentTimeMs; + lastFetchTimestamp = currentTimeMs; + } else { + lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp(); + lastFetchTimestamp = replicaState.lastFetchTimestamp(); + } + DescribeQuorumResponseData.ReplicaState replicaStateData = new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(replicaState.replicaKey().id()) + .setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L)) + .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) + .setLastFetchTimestamp(lastFetchTimestamp); + + if (apiVersion >= 2) { + replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); + } + return replicaStateData; + } + + public static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java new file mode 100644 index 0000000000000..78f1069995326 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.message.RemoveRaftVoterRequestData; +import org.apache.kafka.common.message.RemoveRaftVoterResponseData; +import org.apache.kafka.common.message.UpdateRaftVoterRequestData; +import org.apache.kafka.common.message.UpdateRaftVoterResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.raft.ReplicaKey; + +import java.net.InetSocketAddress; +import java.util.Optional; + +public class DynamicReconfigRpc { + public static AddRaftVoterRequestData addVoterRequest( + String clusterId, + int timeoutMs, + ReplicaKey voter, + Endpoints listeners + ) { + return new AddRaftVoterRequestData() + .setClusterId(clusterId) + .setTimeoutMs(timeoutMs) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(listeners.toAddVoterRequest()); + } + + public static AddRaftVoterResponseData addVoterResponse( + Errors error, + String errorMessage + ) { + errorMessage = errorMessage == null ? error.message() : errorMessage; + + return new AddRaftVoterResponseData() + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static RemoveRaftVoterRequestData removeVoterRequest( + String clusterId, + ReplicaKey voter + ) { + return new RemoveRaftVoterRequestData() + .setClusterId(clusterId) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); + } + + public static RemoveRaftVoterResponseData removeVoterResponse( + Errors error, + String errorMessage + ) { + errorMessage = errorMessage == null ? error.message() : errorMessage; + + return new RemoveRaftVoterResponseData() + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static UpdateRaftVoterRequestData updateVoterRequest( + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints + ) { + UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData() + .setClusterId(clusterId) + .setCurrentLeaderEpoch(epoch) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(endpoints.toUpdateVoterRequest()); + + request.kRaftVersionFeature() + .setMinSupportedVersion(supportedVersions.min()) + .setMaxSupportedVersion(supportedVersions.max()); + + return request; + } + + public static UpdateRaftVoterResponseData updateVoterResponse( + Errors error, + ListenerName listenerName, + LeaderAndEpoch leaderAndEpoch, + Endpoints endpoints + ) { + UpdateRaftVoterResponseData response = new UpdateRaftVoterResponseData() + .setErrorCode(error.code()); + + response.currentLeader() + .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) + .setLeaderEpoch(leaderAndEpoch.epoch()); + + Optional address = endpoints.address(listenerName); + if (address.isPresent()) { + response.currentLeader() + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()); + } + + return response; + } + + public static Optional addVoterRequestVoterKey(AddRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } + + public static Optional removeVoterRequestVoterKey(RemoveRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } + + public static Optional updateVoterRequestVoterKey(UpdateRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java new file mode 100644 index 0000000000000..d0d90b00f6800 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.EndQuorumEpochRequestData; +import org.apache.kafka.common.message.EndQuorumEpochResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class EndQuorumEpochRpc { + public static EndQuorumEpochRequestData singletonEndQuorumEpochRequest( + TopicPartition topicPartition, + String clusterId, + int leaderEpoch, + int leaderId, + List preferredReplicaKeys + ) { + List preferredSuccessors = preferredReplicaKeys + .stream() + .map(ReplicaKey::id) + .collect(Collectors.toList()); + + List preferredCandidates = preferredReplicaKeys + .stream() + .map(replicaKey -> new EndQuorumEpochRequestData.ReplicaInfo() + .setCandidateId(replicaKey.id()) + .setCandidateDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + ) + .collect(Collectors.toList()); + + return new EndQuorumEpochRequestData() + .setClusterId(clusterId) + .setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new EndQuorumEpochRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setLeaderEpoch(leaderEpoch) + .setLeaderId(leaderId) + .setPreferredSuccessors(preferredSuccessors) + .setPreferredCandidates(preferredCandidates) + ) + ) + ) + ); + } + + public static EndQuorumEpochResponseData singletonEndQuorumEpochResponse( + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + Endpoints endpoints + ) { + EndQuorumEpochResponseData response = new EndQuorumEpochResponseData() + .setErrorCode(topLevelError.code()) + .setTopics(Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions(Collections.singletonList( + new EndQuorumEpochResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + )) + ) + ); + + if (apiVersion >= 1) { + Optional address = endpoints.address(listenerName); + if (address.isPresent() && leaderId >= 0) { + // Populate the node endpoints + EndQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = + new EndQuorumEpochResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new EndQuorumEpochResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) + ); + response.setNodeEndpoints(nodeEndpoints); + } + } + + return response; + } + + public static boolean hasValidTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } + + public static boolean hasValidTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java new file mode 100644 index 0000000000000..fb1e8d90cc76a --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.FetchRequestData; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.function.Consumer; + +public class FetchRpc { + public static FetchRequestData singletonFetchRequest( + TopicPartition topicPartition, + Uuid topicId, + Consumer partitionConsumer + ) { + FetchRequestData.FetchPartition fetchPartition = + new FetchRequestData.FetchPartition() + .setPartition(topicPartition.partition()); + partitionConsumer.accept(fetchPartition); + + FetchRequestData.FetchTopic fetchTopic = + new FetchRequestData.FetchTopic() + .setTopic(topicPartition.topic()) + .setTopicId(topicId) + .setPartitions(Collections.singletonList(fetchPartition)); + + return new FetchRequestData() + .setTopics(Collections.singletonList(fetchTopic)); + } + + public static FetchResponseData singletonFetchResponse( + ListenerName listenerName, + short apiVersion, + TopicPartition topicPartition, + Uuid topicId, + Errors topLevelError, + int leaderId, + Endpoints endpoints, + Consumer partitionConsumer + ) { + FetchResponseData.PartitionData fetchablePartition = + new FetchResponseData.PartitionData(); + + fetchablePartition.setPartitionIndex(topicPartition.partition()); + + partitionConsumer.accept(fetchablePartition); + + FetchResponseData.FetchableTopicResponse fetchableTopic = + new FetchResponseData.FetchableTopicResponse() + .setTopic(topicPartition.topic()) + .setTopicId(topicId) + .setPartitions(Collections.singletonList(fetchablePartition)); + + FetchResponseData response = new FetchResponseData(); + + if (apiVersion >= 17) { + Optional address = endpoints.address(listenerName); + if (address.isPresent() && leaderId >= 0) { + // Populate the node endpoints + FetchResponseData.NodeEndpointCollection nodeEndpoints = new FetchResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new FetchResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) + ); + response.setNodeEndpoints(nodeEndpoints); + } + } + + return response + .setErrorCode(topLevelError.code()) + .setResponses(Collections.singletonList(fetchableTopic)); + } + + public static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { + return data.topics().size() == 1 && + data.topics().get(0).topicId().equals(topicId) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); + } + + public static boolean hasValidTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) { + return data.responses().size() == 1 && + data.responses().get(0).topicId().equals(topicId) && + data.responses().get(0).partitions().size() == 1 && + data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java new file mode 100644 index 0000000000000..5a3568429dd47 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.FetchSnapshotRequestData; +import org.apache.kafka.common.message.FetchSnapshotResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.ReplicaKey; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Optional; +import java.util.function.UnaryOperator; + +public class FetchSnapshotRpc { + public static FetchSnapshotRequestData singletonFetchSnapshotRequest( + String clusterId, + ReplicaKey replicaKey, + TopicPartition topicPartition, + int epoch, + OffsetAndEpoch offsetAndEpoch, + int maxBytes, + long position + ) { + FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId() + .setEndOffset(offsetAndEpoch.offset()) + .setEpoch(offsetAndEpoch.epoch()); + + FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = new FetchSnapshotRequestData.PartitionSnapshot() + .setPartition(topicPartition.partition()) + .setCurrentLeaderEpoch(epoch) + .setSnapshotId(snapshotId) + .setPosition(position) + .setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); + + return new FetchSnapshotRequestData() + .setClusterId(clusterId) + .setReplicaId(replicaKey.id()) + .setMaxBytes(maxBytes) + .setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topicPartition.topic()) + .setPartitions(Collections.singletonList(partitionSnapshot)) + ) + ); + } + + /** + * Creates a FetchSnapshotResponseData with a single PartitionSnapshot for the topic partition. + *

+ * The partition index will already be populated when calling operator. + * + * @param listenerName the listener used to accept the request + * @param apiVersion the api version of the request + * @param topicPartition the topic partition to include + * @param leaderId the id of the leader + * @param endpoints the endpoints of the leader + * @param operator unary operator responsible for populating all of the appropriate fields + * @return the created fetch snapshot response data + */ + public static FetchSnapshotResponseData singletonFetchSnapshotResponse( + ListenerName listenerName, + short apiVersion, + TopicPartition topicPartition, + int leaderId, + Endpoints endpoints, + UnaryOperator operator + ) { + FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = operator.apply( + new FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition()) + ); + + FetchSnapshotResponseData response = new FetchSnapshotResponseData() + .setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topicPartition.topic()) + .setPartitions(Collections.singletonList(partitionSnapshot)) + ) + ); + + if (apiVersion >= 1) { + Optional address = endpoints.address(listenerName); + if (address.isPresent() && leaderId >= 0) { + // Populate the node endpoints + FetchSnapshotResponseData.NodeEndpointCollection nodeEndpoints = + new FetchSnapshotResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new FetchSnapshotResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) + ); + response.setNodeEndpoints(nodeEndpoints); + } + } + + return response; + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java new file mode 100644 index 0000000000000..30033cf283811 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.VoteRequestData; +import org.apache.kafka.common.message.VoteResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.ReplicaKey; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Optional; + +public class VoteRpc { + public static VoteRequestData singletonVoteRequest( + TopicPartition topicPartition, + String clusterId, + int replicaEpoch, + ReplicaKey replicaKey, + ReplicaKey voterKey, + int lastEpoch, + long lastEpochEndOffset, + boolean preVote + ) { + return new VoteRequestData() + .setClusterId(clusterId) + .setVoterId(voterKey.id()) + .setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new VoteRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setReplicaEpoch(replicaEpoch) + .setReplicaId(replicaKey.id()) + .setReplicaDirectoryId( + replicaKey + .directoryId() + .orElse(ReplicaKey.NO_DIRECTORY_ID) + ) + .setVoterDirectoryId( + voterKey + .directoryId() + .orElse(ReplicaKey.NO_DIRECTORY_ID) + ) + .setLastOffsetEpoch(lastEpoch) + .setLastOffset(lastEpochEndOffset) + .setPreVote(preVote) + ) + ) + ) + ); + } + + public static VoteResponseData singletonVoteResponse( + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + boolean voteGranted, + Endpoints endpoints + ) { + VoteResponseData response = new VoteResponseData() + .setErrorCode(topLevelError.code()) + .setTopics(Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions(Collections.singletonList( + new VoteResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setVoteGranted(voteGranted)) + ) + ) + ); + + if (apiVersion >= 1) { + Optional address = endpoints.address(listenerName); + if (address.isPresent() && leaderId >= 0) { + // Populate the node endpoints + VoteResponseData.NodeEndpointCollection nodeEndpoints = new VoteResponseData.NodeEndpointCollection(1); + nodeEndpoints.add( + new VoteResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) + ); + response.setNodeEndpoints(nodeEndpoints); + } + } + + return response; + } + + public static Optional voteRequestVoterKey( + VoteRequestData request, + VoteRequestData.PartitionData partition + ) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), partition.voterDirectoryId())); + } + } + + public static boolean hasValidTopicPartition(VoteResponseData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } + + public static boolean hasValidTopicPartition(VoteRequestData data, TopicPartition topicPartition) { + return data.topics().size() == 1 && + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java index ba49a9c14310b..36d40a63ba6e8 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java @@ -50,6 +50,9 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; +import org.apache.kafka.raft.utils.FetchRpc; +import org.apache.kafka.raft.utils.FetchSnapshotRpc; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -288,7 +291,7 @@ private ApiMessage buildTestRequest(ApiKeys key) { return VoteRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId, lastEpoch, 329, true); case FETCH: - FetchRequestData request = RaftUtil.singletonFetchRequest(topicPartition, topicId, fetchPartition -> + FetchRequestData request = FetchRpc.singletonFetchRequest(topicPartition, topicId, fetchPartition -> fetchPartition .setCurrentLeaderEpoch(5) .setFetchOffset(333) @@ -298,7 +301,7 @@ private ApiMessage buildTestRequest(ApiKeys key) { return request; case FETCH_SNAPSHOT: - return RaftUtil.singletonFetchSnapshotRequest( + return FetchSnapshotRpc.singletonFetchSnapshotRequest( clusterId, ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), topicPartition, @@ -309,7 +312,7 @@ private ApiMessage buildTestRequest(ApiKeys key) { ); case UPDATE_RAFT_VOTER: - return RaftUtil.updateVoterRequest( + return DynamicReconfigRpc.updateVoterRequest( clusterId, ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 5, diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java index 43cfeb29fe1b0..2d0225f533a20 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.raft.RaftClientTestContext.RaftProtocol; +import org.apache.kafka.raft.utils.ApiMessageUtils; import org.apache.kafka.server.common.KRaftVersion; import org.junit.jupiter.params.ParameterizedTest; @@ -795,7 +796,7 @@ public void testPreVoteNotSupportedByRemote(KRaftVersion kraftVersion) throws Ex context.deliverResponse( voteRequests.get(0).correlationId(), voteRequests.get(0).destination(), - RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ApiMessageUtils.parseErrorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) ); // Local should transition to Candidate since it realizes remote node does not support PreVote. @@ -837,7 +838,7 @@ public void testPreVoteNotSupportedByRemote(KRaftVersion kraftVersion) throws Ex context.deliverResponse( voteRequests.get(1).correlationId(), voteRequests.get(1).destination(), - RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ApiMessageUtils.parseErrorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) ); context.client.poll(); assertEquals(epoch + 2, context.currentEpoch()); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 1f3307f9adaef..6231b9acb7598 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.requests.FetchSnapshotRequest; import org.apache.kafka.raft.internals.StringSerde; +import org.apache.kafka.raft.utils.FetchSnapshotRpc; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.RecordsSnapshotWriter; @@ -2112,7 +2113,7 @@ public static FetchSnapshotRequestData fetchSnapshotRequest( int maxBytes, long position ) { - return RaftUtil.singletonFetchSnapshotRequest( + return FetchSnapshotRpc.singletonFetchSnapshotRequest( null, ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID), topicPartition, @@ -2132,7 +2133,7 @@ private static FetchSnapshotRequestData fetchSnapshotRequest( int maxBytes, long position ) { - return RaftUtil.singletonFetchSnapshotRequest( + return FetchSnapshotRpc.singletonFetchSnapshotRequest( clusterId, replicaKey, topicPartition, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 29a9ccb561079..a4691906ae9af 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -65,6 +65,14 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.StringSerde; +import org.apache.kafka.raft.utils.ApiMessageUtils; +import org.apache.kafka.raft.utils.BeginQuorumEpochRpc; +import org.apache.kafka.raft.utils.DescribeQuorumRpc; +import org.apache.kafka.raft.utils.DynamicReconfigRpc; +import org.apache.kafka.raft.utils.EndQuorumEpochRpc; +import org.apache.kafka.raft.utils.FetchRpc; +import org.apache.kafka.raft.utils.FetchSnapshotRpc; +import org.apache.kafka.raft.utils.VoteRpc; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.serialization.RecordSerde; @@ -102,7 +110,6 @@ import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR; import static org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL; -import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -625,7 +632,7 @@ void expectAndGrantPreVotes(int epoch) throws Exception { deliverResponse( request.correlationId(), request.destination(), - RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ApiMessageUtils.parseErrorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) ); } else { VoteResponseData voteResponse = voteResponse(true, OptionalInt.empty(), epoch); @@ -840,7 +847,7 @@ void assertSentVoteResponse( RaftMessage raftMessage = sentMessages.get(0); assertInstanceOf(VoteResponseData.class, raftMessage.data()); VoteResponseData response = (VoteResponseData) raftMessage.data(); - assertTrue(hasValidTopicPartition(response, metadataPartition)); + assertTrue(VoteRpc.hasValidTopicPartition(response, metadataPartition)); VoteResponseData.PartitionData partitionResponse = response.topics().get(0).partitions().get(0); @@ -1421,7 +1428,7 @@ EndQuorumEpochResponseData endEpochResponse( int epoch, OptionalInt leaderId ) { - return RaftUtil.singletonEndQuorumEpochResponse( + return EndQuorumEpochRpc.singletonEndQuorumEpochResponse( channel.listenerName(), endQuorumEpochRpcVersion(), Errors.NONE, @@ -1452,7 +1459,7 @@ EndQuorumEpochRequestData endEpochRequest( int leaderId, List preferredCandidates ) { - return RaftUtil.singletonEndQuorumEpochRequest( + return EndQuorumEpochRpc.singletonEndQuorumEpochRequest( metadataPartition, clusterId, epoch, @@ -1503,7 +1510,7 @@ BeginQuorumEpochRequestData beginEpochRequest( Endpoints endpoints, ReplicaKey voterKey ) { - return RaftUtil.singletonBeginQuorumEpochRequest( + return BeginQuorumEpochRpc.singletonBeginQuorumEpochRequest( metadataPartition, clusterId, epoch, @@ -1514,7 +1521,7 @@ BeginQuorumEpochRequestData beginEpochRequest( } BeginQuorumEpochResponseData beginEpochResponse(int epoch, int leaderId) { - return RaftUtil.singletonBeginQuorumEpochResponse( + return BeginQuorumEpochRpc.singletonBeginQuorumEpochResponse( channel.listenerName(), beginQuorumEpochRpcVersion(), Errors.NONE, @@ -1590,7 +1597,7 @@ VoteRequestData voteRequest( long lastEpochOffset, boolean preVote ) { - return RaftUtil.singletonVoteRequest( + return VoteRpc.singletonVoteRequest( metadataPartition, clusterId, epoch, @@ -1611,7 +1618,7 @@ VoteResponseData voteResponse(Errors error, OptionalInt leaderId, int epoch) { } VoteResponseData voteResponse(Errors error, boolean voteGranted, OptionalInt leaderId, int epoch, short version) { - return RaftUtil.singletonVoteResponse( + return VoteRpc.singletonVoteResponse( channel.listenerName(), version, Errors.NONE, @@ -1625,7 +1632,7 @@ VoteResponseData voteResponse(Errors error, boolean voteGranted, OptionalInt lea } private VoteRequestData.PartitionData unwrap(VoteRequestData voteRequest) { - assertTrue(RaftUtil.hasValidTopicPartition(voteRequest, metadataPartition)); + assertTrue(VoteRpc.hasValidTopicPartition(voteRequest, metadataPartition)); return voteRequest.topics().get(0).partitions().get(0); } @@ -1736,7 +1743,7 @@ FetchRequestData fetchRequest( int lastFetchedEpoch, int maxWaitTimeMs ) { - FetchRequestData request = RaftUtil.singletonFetchRequest( + FetchRequestData request = FetchRpc.singletonFetchRequest( metadataPartition, metadataTopicId, fetchPartition -> { @@ -1765,7 +1772,7 @@ FetchResponseData fetchResponse( long highWatermark, Errors error ) { - return RaftUtil.singletonFetchResponse( + return FetchRpc.singletonFetchResponse( channel.listenerName(), fetchRpcVersion(), metadataPartition, @@ -1793,7 +1800,7 @@ FetchResponseData divergingFetchResponse( int divergingEpoch, long highWatermark ) { - return RaftUtil.singletonFetchResponse( + return FetchRpc.singletonFetchResponse( channel.listenerName(), fetchRpcVersion(), metadataPartition, @@ -1823,7 +1830,7 @@ FetchResponseData snapshotFetchResponse( OffsetAndEpoch snapshotId, long highWatermark ) { - return RaftUtil.singletonFetchResponse( + return FetchRpc.singletonFetchResponse( channel.listenerName(), fetchRpcVersion(), metadataPartition, @@ -1851,7 +1858,7 @@ FetchSnapshotResponseData fetchSnapshotResponse( int leaderId, UnaryOperator operator ) { - return RaftUtil.singletonFetchSnapshotResponse( + return FetchSnapshotRpc.singletonFetchSnapshotResponse( channel.listenerName(), fetchSnapshotRpcVersion(), metadataPartition, @@ -1862,7 +1869,7 @@ FetchSnapshotResponseData fetchSnapshotResponse( } DescribeQuorumRequestData describeQuorumRequest() { - return RaftUtil.singletonDescribeQuorumRequest(metadataPartition); + return DescribeQuorumRpc.singletonDescribeQuorumRequest(metadataPartition); } AddRaftVoterRequestData addVoterRequest( @@ -1884,7 +1891,7 @@ AddRaftVoterRequestData addVoterRequest( ReplicaKey voter, Endpoints endpoints ) { - return RaftUtil.addVoterRequest( + return DynamicReconfigRpc.addVoterRequest( clusterId, timeoutMs, voter, @@ -1897,7 +1904,7 @@ RemoveRaftVoterRequestData removeVoterRequest(ReplicaKey voter) { } RemoveRaftVoterRequestData removeVoterRequest(String cluster, ReplicaKey voter) { - return RaftUtil.removeVoterRequest(cluster, voter); + return DynamicReconfigRpc.removeVoterRequest(cluster, voter); } UpdateRaftVoterRequestData updateVoterRequest( @@ -1915,14 +1922,14 @@ UpdateRaftVoterRequestData updateVoterRequest( SupportedVersionRange supportedVersions, Endpoints endpoints ) { - return RaftUtil.updateVoterRequest(clusterId, voter, epoch, supportedVersions, endpoints); + return DynamicReconfigRpc.updateVoterRequest(clusterId, voter, epoch, supportedVersions, endpoints); } UpdateRaftVoterResponseData updateVoterResponse( Errors error, LeaderAndEpoch leaderAndEpoch ) { - return RaftUtil.updateVoterResponse( + return DynamicReconfigRpc.updateVoterResponse( error, channel.listenerName(), leaderAndEpoch, diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 6bfaf7b7ff37f..c9a439a72bc68 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -51,6 +51,13 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.raft.utils.ApiMessageUtils; +import org.apache.kafka.raft.utils.BeginQuorumEpochRpc; +import org.apache.kafka.raft.utils.DescribeQuorumRpc; +import org.apache.kafka.raft.utils.EndQuorumEpochRpc; +import org.apache.kafka.raft.utils.FetchRpc; +import org.apache.kafka.raft.utils.FetchSnapshotRpc; +import org.apache.kafka.raft.utils.VoteRpc; import com.fasterxml.jackson.databind.JsonNode; @@ -81,16 +88,16 @@ public class RaftUtilTest { @Test public void testErrorResponse() { assertEquals(new VoteResponseData().setErrorCode(Errors.NONE.code()), - RaftUtil.errorResponse(ApiKeys.VOTE, Errors.NONE)); + ApiMessageUtils.parseErrorResponse(ApiKeys.VOTE, Errors.NONE)); assertEquals(new BeginQuorumEpochResponseData().setErrorCode(Errors.NONE.code()), - RaftUtil.errorResponse(ApiKeys.BEGIN_QUORUM_EPOCH, Errors.NONE)); + ApiMessageUtils.parseErrorResponse(ApiKeys.BEGIN_QUORUM_EPOCH, Errors.NONE)); assertEquals(new EndQuorumEpochResponseData().setErrorCode(Errors.NONE.code()), - RaftUtil.errorResponse(ApiKeys.END_QUORUM_EPOCH, Errors.NONE)); + ApiMessageUtils.parseErrorResponse(ApiKeys.END_QUORUM_EPOCH, Errors.NONE)); assertEquals(new FetchResponseData().setErrorCode(Errors.NONE.code()), - RaftUtil.errorResponse(ApiKeys.FETCH, Errors.NONE)); + ApiMessageUtils.parseErrorResponse(ApiKeys.FETCH, Errors.NONE)); assertEquals(new FetchSnapshotResponseData().setErrorCode(Errors.NONE.code()), - RaftUtil.errorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE)); - assertThrows(IllegalArgumentException.class, () -> RaftUtil.errorResponse(ApiKeys.PRODUCE, Errors.NONE)); + ApiMessageUtils.parseErrorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE)); + assertThrows(IllegalArgumentException.class, () -> ApiMessageUtils.parseErrorResponse(ApiKeys.PRODUCE, Errors.NONE)); } private static Stream singletonFetchRequestTestCases() { @@ -327,7 +334,7 @@ private static Stream describeQuorumResponseTestCases() { @ParameterizedTest @MethodSource("singletonFetchRequestTestCases") public void testSingletonFetchRequestForAllVersion(final FetchRequestTestCase testCase) { - FetchRequestData fetchRequestData = RaftUtil.singletonFetchRequest(topicPartition, Uuid.ONE_UUID, + FetchRequestData fetchRequestData = FetchRpc.singletonFetchRequest(topicPartition, Uuid.ONE_UUID, partition -> partition .setPartitionMaxBytes(10) .setCurrentLeaderEpoch(5) @@ -348,7 +355,7 @@ public void testSingletonFetchRequestForAllVersion(final FetchRequestTestCase te @ParameterizedTest @MethodSource("singletonFetchRequestTestCases") public void testFetchRequestV17Compatibility(final FetchRequestTestCase testCase) { - FetchRequestData fetchRequestData = RaftUtil.singletonFetchRequest( + FetchRequestData fetchRequestData = FetchRpc.singletonFetchRequest( topicPartition, Uuid.ONE_UUID, partition -> partition @@ -374,7 +381,7 @@ public void testSingletonFetchResponseForAllVersion(final FetchResponseTestCase final int producerId = 1; final int firstOffset = 10; - FetchResponseData fetchResponseData = RaftUtil.singletonFetchResponse( + FetchResponseData fetchResponseData = FetchRpc.singletonFetchResponse( listenerName, testCase.version, topicPartition, @@ -410,7 +417,7 @@ public void testSingletonVoteRequestForAllVersion(final short version, final Str int lastEpoch = 1000; long lastEpochOffset = 1000; - VoteRequestData voteRequestData = RaftUtil.singletonVoteRequest( + VoteRequestData voteRequestData = VoteRpc.singletonVoteRequest( topicPartition, clusterId, replicaEpoch, @@ -430,7 +437,7 @@ public void testSingletonVoteResponseForAllVersion(final short version, final St int leaderEpoch = 1; int leaderId = 1; - VoteResponseData voteResponseData = RaftUtil.singletonVoteResponse( + VoteResponseData voteResponseData = VoteRpc.singletonVoteResponse( listenerName, version, Errors.NONE, @@ -454,7 +461,7 @@ public void testSingletonFetchSnapshotRequestForAllVersion(final short version, int maxBytes = 1000; int position = 10; - FetchSnapshotRequestData fetchSnapshotRequestData = RaftUtil.singletonFetchSnapshotRequest( + FetchSnapshotRequestData fetchSnapshotRequestData = FetchSnapshotRpc.singletonFetchSnapshotRequest( clusterId, ReplicaKey.of(1, directoryId), topicPartition, @@ -482,7 +489,7 @@ public void testSingletonFetchSnapshotRequestV1Compatibility( int maxBytes = 1000; int position = 10; - FetchSnapshotRequestData fetchSnapshotRequestData = RaftUtil.singletonFetchSnapshotRequest( + FetchSnapshotRequestData fetchSnapshotRequestData = FetchSnapshotRpc.singletonFetchSnapshotRequest( clusterId, ReplicaKey.of(1, directoryId), topicPartition, @@ -501,7 +508,7 @@ public void testSingletonFetchSnapshotRequestV1Compatibility( public void testSingletonFetchSnapshotResponseForAllVersion(final short version, final String expectedJson) { int leaderId = 1; - FetchSnapshotResponseData fetchSnapshotResponseData = RaftUtil.singletonFetchSnapshotResponse( + FetchSnapshotResponseData fetchSnapshotResponseData = FetchSnapshotRpc.singletonFetchSnapshotResponse( listenerName, version, topicPartition, @@ -520,7 +527,7 @@ public void testSingletonBeginQuorumEpochRequestForAllVersion(final short versio int leaderEpoch = 1; int leaderId = 1; - BeginQuorumEpochRequestData beginQuorumEpochRequestData = RaftUtil.singletonBeginQuorumEpochRequest( + BeginQuorumEpochRequestData beginQuorumEpochRequestData = BeginQuorumEpochRpc.singletonBeginQuorumEpochRequest( topicPartition, clusterId, leaderEpoch, @@ -538,7 +545,7 @@ public void testSingletonBeginQuorumEpochResponseForAllVersion(final short versi int leaderEpoch = 1; int leaderId = 1; - BeginQuorumEpochResponseData beginQuorumEpochResponseData = RaftUtil.singletonBeginQuorumEpochResponse( + BeginQuorumEpochResponseData beginQuorumEpochResponseData = BeginQuorumEpochRpc.singletonBeginQuorumEpochResponse( listenerName, version, Errors.NONE, @@ -558,7 +565,7 @@ public void testSingletonEndQuorumEpochRequestForAllVersion(final short version, int leaderEpoch = 1; int leaderId = 1; - EndQuorumEpochRequestData endQuorumEpochRequestData = RaftUtil.singletonEndQuorumEpochRequest( + EndQuorumEpochRequestData endQuorumEpochRequestData = EndQuorumEpochRpc.singletonEndQuorumEpochRequest( topicPartition, clusterId, leaderEpoch, @@ -575,7 +582,7 @@ public void testSingletonEndQuorumEpochResponseForAllVersion(final short version int leaderEpoch = 1; int leaderId = 1; - EndQuorumEpochResponseData endQuorumEpochResponseData = RaftUtil.singletonEndQuorumEpochResponse( + EndQuorumEpochResponseData endQuorumEpochResponseData = EndQuorumEpochRpc.singletonEndQuorumEpochResponse( listenerName, version, Errors.NONE, @@ -592,7 +599,7 @@ public void testSingletonEndQuorumEpochResponseForAllVersion(final short version @ParameterizedTest @MethodSource("describeQuorumRequestTestCases") public void testSingletonDescribeQuorumRequestForAllVersion(final short version, final String expectedJson) { - DescribeQuorumRequestData describeQuorumRequestData = RaftUtil.singletonDescribeQuorumRequest(topicPartition); + DescribeQuorumRequestData describeQuorumRequestData = DescribeQuorumRpc.singletonDescribeQuorumRequest(topicPartition); JsonNode json = DescribeQuorumRequestDataJsonConverter.write(describeQuorumRequestData, version); assertEquals(expectedJson, json.toString()); } @@ -607,7 +614,7 @@ public void testSingletonDescribeQuorumResponseForAllVersion(final short version LeaderState.ReplicaState replicaState = new LeaderState.ReplicaState(replicaKey, true, Endpoints.empty()); - DescribeQuorumResponseData describeQuorumResponseData = RaftUtil.singletonDescribeQuorumResponse( + DescribeQuorumResponseData describeQuorumResponseData = DescribeQuorumRpc.singletonDescribeQuorumResponse( version, topicPartition, leaderId,