From 7b2740b68e21a4111aa6a0c188423444a1d30e89 Mon Sep 17 00:00:00 2001 From: frankvicky Date: Tue, 14 Jan 2025 08:47:40 +0800 Subject: [PATCH] KAFKA-16907: reformat --- .../kafka/raft/utils/BeginQuorumEpochRpc.java | 114 +++++++-------- .../kafka/raft/utils/DescribeQuorumRpc.java | 110 +++++++------- .../kafka/raft/utils/DynamicReconfigRpc.java | 86 +++++------ .../kafka/raft/utils/EndQuorumEpochRpc.java | 120 ++++++++-------- .../org/apache/kafka/raft/utils/FetchRpc.java | 70 ++++----- .../kafka/raft/utils/FetchSnapshotRpc.java | 98 ++++++------- .../org/apache/kafka/raft/utils/VoteRpc.java | 134 +++++++++--------- 7 files changed, 366 insertions(+), 366 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java index ad836fc751f5e..b09d4e4677f8c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/BeginQuorumEpochRpc.java @@ -31,72 +31,72 @@ public class BeginQuorumEpochRpc { public static BeginQuorumEpochRequestData singletonBeginQuorumEpochRequest( - TopicPartition topicPartition, - String clusterId, - int leaderEpoch, - int leaderId, - Endpoints leaderEndpoints, - ReplicaKey voterKey + TopicPartition topicPartition, + String clusterId, + int leaderEpoch, + int leaderId, + Endpoints leaderEndpoints, + ReplicaKey voterKey ) { return new BeginQuorumEpochRequestData() - .setClusterId(clusterId) - .setVoterId(voterKey.id()) - .setTopics( - Collections.singletonList( - new BeginQuorumEpochRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new BeginQuorumEpochRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setLeaderEpoch(leaderEpoch) - .setLeaderId(leaderId) - .setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - ) - ) + .setClusterId(clusterId) + .setVoterId(voterKey.id()) + .setTopics( + Collections.singletonList( + new BeginQuorumEpochRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new BeginQuorumEpochRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setLeaderEpoch(leaderEpoch) + .setLeaderId(leaderId) + .setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + ) ) ) - .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest()); + ) + .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest()); } public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - Endpoints endpoints + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + Endpoints endpoints ) { BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData() - .setErrorCode(topLevelError.code()) - .setTopics( - Collections.singletonList( - new BeginQuorumEpochResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new BeginQuorumEpochResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - ) - ) + .setErrorCode(topLevelError.code()) + .setTopics( + Collections.singletonList( + new BeginQuorumEpochResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new BeginQuorumEpochResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + ) ) - ); + ) + ); if (apiVersion >= 1) { Optional address = endpoints.address(listenerName); if (address.isPresent() && leaderId >= 0) { // Populate the node endpoints BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = - new BeginQuorumEpochResponseData.NodeEndpointCollection(1); + new BeginQuorumEpochResponseData.NodeEndpointCollection(1); nodeEndpoints.add( - new BeginQuorumEpochResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) + new BeginQuorumEpochResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) ); response.setNodeEndpoints(nodeEndpoints); } @@ -107,21 +107,21 @@ public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse( public static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } public static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } public static Optional beginQuorumEpochRequestVoterKey( - BeginQuorumEpochRequestData request, - BeginQuorumEpochRequestData.PartitionData partition + BeginQuorumEpochRequestData request, + BeginQuorumEpochRequestData.PartitionData partition ) { if (request.voterId() < 0) { return Optional.empty(); diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java index f0dc34a391a04..9b695513410a4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java @@ -31,55 +31,55 @@ public class DescribeQuorumRpc { public static DescribeQuorumRequestData singletonDescribeQuorumRequest( - TopicPartition topicPartition + TopicPartition topicPartition ) { return new DescribeQuorumRequestData() - .setTopics( - Collections.singletonList( - new DescribeQuorumRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new DescribeQuorumRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - ) - ) + .setTopics( + Collections.singletonList( + new DescribeQuorumRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new DescribeQuorumRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + ) ) - ); + ) + ); } public static DescribeQuorumResponseData singletonDescribeQuorumResponse( - short apiVersion, - TopicPartition topicPartition, - int leaderId, - int leaderEpoch, - long highWatermark, - Collection voters, - Collection observers, - long currentTimeMs + short apiVersion, + TopicPartition topicPartition, + int leaderId, + int leaderEpoch, + long highWatermark, + Collection voters, + Collection observers, + long currentTimeMs ) { DescribeQuorumResponseData response = new DescribeQuorumResponseData() - .setTopics( - Collections.singletonList( - new DescribeQuorumResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new DescribeQuorumResponseData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setErrorCode(Errors.NONE.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - .setHighWatermark(highWatermark) - .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) - .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)))))); + .setTopics( + Collections.singletonList( + new DescribeQuorumResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new DescribeQuorumResponseData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setErrorCode(Errors.NONE.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setHighWatermark(highWatermark) + .setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs)) + .setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs)))))); if (apiVersion >= 2) { DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size()); for (LeaderState.ReplicaState voter : voters) { nodes.add( - new DescribeQuorumResponseData.Node() - .setNodeId(voter.replicaKey().id()) - .setListeners(voter.listeners().toDescribeQuorumResponseListeners()) + new DescribeQuorumResponseData.Node() + .setNodeId(voter.replicaKey().id()) + .setListeners(voter.listeners().toDescribeQuorumResponseListeners()) ); } response.setNodes(nodes); @@ -88,22 +88,22 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse( } private static List toReplicaStates( - short apiVersion, - int leaderId, - Collection states, - long currentTimeMs + short apiVersion, + int leaderId, + Collection states, + long currentTimeMs ) { return states - .stream() - .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) - .collect(Collectors.toList()); + .stream() + .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) + .collect(Collectors.toList()); } private static DescribeQuorumResponseData.ReplicaState toReplicaState( - short apiVersion, - int leaderId, - LeaderState.ReplicaState replicaState, - long currentTimeMs + short apiVersion, + int leaderId, + LeaderState.ReplicaState replicaState, + long currentTimeMs ) { final long lastCaughtUpTimestamp; final long lastFetchTimestamp; @@ -115,10 +115,10 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState( lastFetchTimestamp = replicaState.lastFetchTimestamp(); } DescribeQuorumResponseData.ReplicaState replicaStateData = new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(replicaState.replicaKey().id()) - .setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L)) - .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) - .setLastFetchTimestamp(lastFetchTimestamp); + .setReplicaId(replicaState.replicaKey().id()) + .setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L)) + .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) + .setLastFetchTimestamp(lastFetchTimestamp); if (apiVersion >= 2) { replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); @@ -128,8 +128,8 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState( public static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java index 4c2531ddbf8ef..78f1069995326 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/DynamicReconfigRpc.java @@ -34,90 +34,90 @@ public class DynamicReconfigRpc { public static AddRaftVoterRequestData addVoterRequest( - String clusterId, - int timeoutMs, - ReplicaKey voter, - Endpoints listeners + String clusterId, + int timeoutMs, + ReplicaKey voter, + Endpoints listeners ) { return new AddRaftVoterRequestData() - .setClusterId(clusterId) - .setTimeoutMs(timeoutMs) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - .setListeners(listeners.toAddVoterRequest()); + .setClusterId(clusterId) + .setTimeoutMs(timeoutMs) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(listeners.toAddVoterRequest()); } public static AddRaftVoterResponseData addVoterResponse( - Errors error, - String errorMessage + Errors error, + String errorMessage ) { errorMessage = errorMessage == null ? error.message() : errorMessage; return new AddRaftVoterResponseData() - .setErrorCode(error.code()) - .setErrorMessage(errorMessage); + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); } public static RemoveRaftVoterRequestData removeVoterRequest( - String clusterId, - ReplicaKey voter + String clusterId, + ReplicaKey voter ) { return new RemoveRaftVoterRequestData() - .setClusterId(clusterId) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); + .setClusterId(clusterId) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); } public static RemoveRaftVoterResponseData removeVoterResponse( - Errors error, - String errorMessage + Errors error, + String errorMessage ) { errorMessage = errorMessage == null ? error.message() : errorMessage; return new RemoveRaftVoterResponseData() - .setErrorCode(error.code()) - .setErrorMessage(errorMessage); + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); } public static UpdateRaftVoterRequestData updateVoterRequest( - String clusterId, - ReplicaKey voter, - int epoch, - SupportedVersionRange supportedVersions, - Endpoints endpoints + String clusterId, + ReplicaKey voter, + int epoch, + SupportedVersionRange supportedVersions, + Endpoints endpoints ) { UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData() - .setClusterId(clusterId) - .setCurrentLeaderEpoch(epoch) - .setVoterId(voter.id()) - .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - .setListeners(endpoints.toUpdateVoterRequest()); + .setClusterId(clusterId) + .setCurrentLeaderEpoch(epoch) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(endpoints.toUpdateVoterRequest()); request.kRaftVersionFeature() - .setMinSupportedVersion(supportedVersions.min()) - .setMaxSupportedVersion(supportedVersions.max()); + .setMinSupportedVersion(supportedVersions.min()) + .setMaxSupportedVersion(supportedVersions.max()); return request; } public static UpdateRaftVoterResponseData updateVoterResponse( - Errors error, - ListenerName listenerName, - LeaderAndEpoch leaderAndEpoch, - Endpoints endpoints + Errors error, + ListenerName listenerName, + LeaderAndEpoch leaderAndEpoch, + Endpoints endpoints ) { UpdateRaftVoterResponseData response = new UpdateRaftVoterResponseData() - .setErrorCode(error.code()); + .setErrorCode(error.code()); response.currentLeader() - .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) - .setLeaderEpoch(leaderAndEpoch.epoch()); + .setLeaderId(leaderAndEpoch.leaderId().orElse(-1)) + .setLeaderEpoch(leaderAndEpoch.epoch()); Optional address = endpoints.address(listenerName); if (address.isPresent()) { response.currentLeader() - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()); + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()); } return response; diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java index 86701023eafca..7ecf8ea8e73e5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/EndQuorumEpochRpc.java @@ -32,79 +32,79 @@ public class EndQuorumEpochRpc { public static EndQuorumEpochRequestData singletonEndQuorumEpochRequest( - TopicPartition topicPartition, - String clusterId, - int leaderEpoch, - int leaderId, - List preferredReplicaKeys + TopicPartition topicPartition, + String clusterId, + int leaderEpoch, + int leaderId, + List preferredReplicaKeys ) { List preferredSuccessors = preferredReplicaKeys - .stream() - .map(ReplicaKey::id) - .collect(Collectors.toList()); + .stream() + .map(ReplicaKey::id) + .collect(Collectors.toList()); List preferredCandidates = preferredReplicaKeys - .stream() - .map(replicaKey -> new EndQuorumEpochRequestData.ReplicaInfo() - .setCandidateId(replicaKey.id()) - .setCandidateDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) - ) - .collect(Collectors.toList()); + .stream() + .map(replicaKey -> new EndQuorumEpochRequestData.ReplicaInfo() + .setCandidateId(replicaKey.id()) + .setCandidateDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + ) + .collect(Collectors.toList()); return new EndQuorumEpochRequestData() - .setClusterId(clusterId) - .setTopics( - Collections.singletonList( - new EndQuorumEpochRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new EndQuorumEpochRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setLeaderEpoch(leaderEpoch) - .setLeaderId(leaderId) - .setPreferredSuccessors(preferredSuccessors) - .setPreferredCandidates(preferredCandidates) - ) - ) + .setClusterId(clusterId) + .setTopics( + Collections.singletonList( + new EndQuorumEpochRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new EndQuorumEpochRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setLeaderEpoch(leaderEpoch) + .setLeaderId(leaderId) + .setPreferredSuccessors(preferredSuccessors) + .setPreferredCandidates(preferredCandidates) + ) ) - ); + ) + ); } public static EndQuorumEpochResponseData singletonEndQuorumEpochResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - Endpoints endpoints + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + Endpoints endpoints ) { EndQuorumEpochResponseData response = new EndQuorumEpochResponseData() - .setErrorCode(topLevelError.code()) - .setTopics(Collections.singletonList( - new EndQuorumEpochResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( - new EndQuorumEpochResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - ))) - ); + .setErrorCode(topLevelError.code()) + .setTopics(Collections.singletonList( + new EndQuorumEpochResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions(Collections.singletonList( + new EndQuorumEpochResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + ))) + ); if (apiVersion >= 1) { Optional address = endpoints.address(listenerName); if (address.isPresent() && leaderId >= 0) { // Populate the node endpoints EndQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints = - new EndQuorumEpochResponseData.NodeEndpointCollection(1); + new EndQuorumEpochResponseData.NodeEndpointCollection(1); nodeEndpoints.add( - new EndQuorumEpochResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) + new EndQuorumEpochResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) ); response.setNodeEndpoints(nodeEndpoints); } @@ -115,15 +115,15 @@ public static EndQuorumEpochResponseData singletonEndQuorumEpochResponse( public static boolean hasValidTopicPartition(EndQuorumEpochRequestData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } public static boolean hasValidTopicPartition(EndQuorumEpochResponseData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java index 8e5ed5b685fa0..fb1e8d90cc76a 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/FetchRpc.java @@ -31,47 +31,47 @@ public class FetchRpc { public static FetchRequestData singletonFetchRequest( - TopicPartition topicPartition, - Uuid topicId, - Consumer partitionConsumer + TopicPartition topicPartition, + Uuid topicId, + Consumer partitionConsumer ) { FetchRequestData.FetchPartition fetchPartition = - new FetchRequestData.FetchPartition() - .setPartition(topicPartition.partition()); + new FetchRequestData.FetchPartition() + .setPartition(topicPartition.partition()); partitionConsumer.accept(fetchPartition); FetchRequestData.FetchTopic fetchTopic = - new FetchRequestData.FetchTopic() - .setTopic(topicPartition.topic()) - .setTopicId(topicId) - .setPartitions(Collections.singletonList(fetchPartition)); + new FetchRequestData.FetchTopic() + .setTopic(topicPartition.topic()) + .setTopicId(topicId) + .setPartitions(Collections.singletonList(fetchPartition)); return new FetchRequestData() - .setTopics(Collections.singletonList(fetchTopic)); + .setTopics(Collections.singletonList(fetchTopic)); } public static FetchResponseData singletonFetchResponse( - ListenerName listenerName, - short apiVersion, - TopicPartition topicPartition, - Uuid topicId, - Errors topLevelError, - int leaderId, - Endpoints endpoints, - Consumer partitionConsumer + ListenerName listenerName, + short apiVersion, + TopicPartition topicPartition, + Uuid topicId, + Errors topLevelError, + int leaderId, + Endpoints endpoints, + Consumer partitionConsumer ) { FetchResponseData.PartitionData fetchablePartition = - new FetchResponseData.PartitionData(); + new FetchResponseData.PartitionData(); fetchablePartition.setPartitionIndex(topicPartition.partition()); partitionConsumer.accept(fetchablePartition); FetchResponseData.FetchableTopicResponse fetchableTopic = - new FetchResponseData.FetchableTopicResponse() - .setTopic(topicPartition.topic()) - .setTopicId(topicId) - .setPartitions(Collections.singletonList(fetchablePartition)); + new FetchResponseData.FetchableTopicResponse() + .setTopic(topicPartition.topic()) + .setTopicId(topicId) + .setPartitions(Collections.singletonList(fetchablePartition)); FetchResponseData response = new FetchResponseData(); @@ -81,31 +81,31 @@ public static FetchResponseData singletonFetchResponse( // Populate the node endpoints FetchResponseData.NodeEndpointCollection nodeEndpoints = new FetchResponseData.NodeEndpointCollection(1); nodeEndpoints.add( - new FetchResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) + new FetchResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) ); response.setNodeEndpoints(nodeEndpoints); } } return response - .setErrorCode(topLevelError.code()) - .setResponses(Collections.singletonList(fetchableTopic)); + .setErrorCode(topLevelError.code()) + .setResponses(Collections.singletonList(fetchableTopic)); } public static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { return data.topics().size() == 1 && - data.topics().get(0).topicId().equals(topicId) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); + data.topics().get(0).topicId().equals(topicId) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partition() == topicPartition.partition(); } public static boolean hasValidTopicPartition(FetchResponseData data, TopicPartition topicPartition, Uuid topicId) { return data.responses().size() == 1 && - data.responses().get(0).topicId().equals(topicId) && - data.responses().get(0).partitions().size() == 1 && - data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.responses().get(0).topicId().equals(topicId) && + data.responses().get(0).partitions().size() == 1 && + data.responses().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java index 30b3280efcf13..5a3568429dd47 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/FetchSnapshotRpc.java @@ -31,83 +31,83 @@ public class FetchSnapshotRpc { public static FetchSnapshotRequestData singletonFetchSnapshotRequest( - String clusterId, - ReplicaKey replicaKey, - TopicPartition topicPartition, - int epoch, - OffsetAndEpoch offsetAndEpoch, - int maxBytes, - long position + String clusterId, + ReplicaKey replicaKey, + TopicPartition topicPartition, + int epoch, + OffsetAndEpoch offsetAndEpoch, + int maxBytes, + long position ) { FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId() - .setEndOffset(offsetAndEpoch.offset()) - .setEpoch(offsetAndEpoch.epoch()); + .setEndOffset(offsetAndEpoch.offset()) + .setEpoch(offsetAndEpoch.epoch()); FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = new FetchSnapshotRequestData.PartitionSnapshot() - .setPartition(topicPartition.partition()) - .setCurrentLeaderEpoch(epoch) - .setSnapshotId(snapshotId) - .setPosition(position) - .setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); + .setPartition(topicPartition.partition()) + .setCurrentLeaderEpoch(epoch) + .setSnapshotId(snapshotId) + .setPosition(position) + .setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); return new FetchSnapshotRequestData() - .setClusterId(clusterId) - .setReplicaId(replicaKey.id()) - .setMaxBytes(maxBytes) - .setTopics( - Collections.singletonList( - new FetchSnapshotRequestData.TopicSnapshot() - .setName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionSnapshot)) - ) - ); + .setClusterId(clusterId) + .setReplicaId(replicaKey.id()) + .setMaxBytes(maxBytes) + .setTopics( + Collections.singletonList( + new FetchSnapshotRequestData.TopicSnapshot() + .setName(topicPartition.topic()) + .setPartitions(Collections.singletonList(partitionSnapshot)) + ) + ); } /** * Creates a FetchSnapshotResponseData with a single PartitionSnapshot for the topic partition. - * + *

* The partition index will already be populated when calling operator. * - * @param listenerName the listener used to accept the request - * @param apiVersion the api version of the request + * @param listenerName the listener used to accept the request + * @param apiVersion the api version of the request * @param topicPartition the topic partition to include - * @param leaderId the id of the leader - * @param endpoints the endpoints of the leader - * @param operator unary operator responsible for populating all of the appropriate fields + * @param leaderId the id of the leader + * @param endpoints the endpoints of the leader + * @param operator unary operator responsible for populating all of the appropriate fields * @return the created fetch snapshot response data */ public static FetchSnapshotResponseData singletonFetchSnapshotResponse( - ListenerName listenerName, - short apiVersion, - TopicPartition topicPartition, - int leaderId, - Endpoints endpoints, - UnaryOperator operator + ListenerName listenerName, + short apiVersion, + TopicPartition topicPartition, + int leaderId, + Endpoints endpoints, + UnaryOperator operator ) { FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = operator.apply( - new FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition()) + new FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition()) ); FetchSnapshotResponseData response = new FetchSnapshotResponseData() - .setTopics( - Collections.singletonList( - new FetchSnapshotResponseData.TopicSnapshot() - .setName(topicPartition.topic()) - .setPartitions(Collections.singletonList(partitionSnapshot)) - ) - ); + .setTopics( + Collections.singletonList( + new FetchSnapshotResponseData.TopicSnapshot() + .setName(topicPartition.topic()) + .setPartitions(Collections.singletonList(partitionSnapshot)) + ) + ); if (apiVersion >= 1) { Optional address = endpoints.address(listenerName); if (address.isPresent() && leaderId >= 0) { // Populate the node endpoints FetchSnapshotResponseData.NodeEndpointCollection nodeEndpoints = - new FetchSnapshotResponseData.NodeEndpointCollection(1); + new FetchSnapshotResponseData.NodeEndpointCollection(1); nodeEndpoints.add( - new FetchSnapshotResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) + new FetchSnapshotResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) ); response.setNodeEndpoints(nodeEndpoints); } diff --git a/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java b/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java index fd0ddbf8b4fcb..1085c90828bb2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java +++ b/raft/src/main/java/org/apache/kafka/raft/utils/VoteRpc.java @@ -30,69 +30,69 @@ public class VoteRpc { public static VoteRequestData singletonVoteRequest( - TopicPartition topicPartition, - String clusterId, - int replicaEpoch, - ReplicaKey replicaKey, - ReplicaKey voterKey, - int lastEpoch, - long lastEpochEndOffset, - boolean preVote + TopicPartition topicPartition, + String clusterId, + int replicaEpoch, + ReplicaKey replicaKey, + ReplicaKey voterKey, + int lastEpoch, + long lastEpochEndOffset, + boolean preVote ) { return new VoteRequestData() - .setClusterId(clusterId) - .setVoterId(voterKey.id()) - .setTopics( - Collections.singletonList( - new VoteRequestData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions( - Collections.singletonList( - new VoteRequestData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setReplicaEpoch(replicaEpoch) - .setReplicaId(replicaKey.id()) - .setReplicaDirectoryId( - replicaKey - .directoryId() - .orElse(ReplicaKey.NO_DIRECTORY_ID) - ) - .setVoterDirectoryId( - voterKey - .directoryId() - .orElse(ReplicaKey.NO_DIRECTORY_ID) - ) - .setLastOffsetEpoch(lastEpoch) - .setLastOffset(lastEpochEndOffset) - .setPreVote(preVote) - ) - ) + .setClusterId(clusterId) + .setVoterId(voterKey.id()) + .setTopics( + Collections.singletonList( + new VoteRequestData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions( + Collections.singletonList( + new VoteRequestData.PartitionData() + .setPartitionIndex(topicPartition.partition()) + .setReplicaEpoch(replicaEpoch) + .setReplicaId(replicaKey.id()) + .setReplicaDirectoryId( + replicaKey + .directoryId() + .orElse(ReplicaKey.NO_DIRECTORY_ID) + ) + .setVoterDirectoryId( + voterKey + .directoryId() + .orElse(ReplicaKey.NO_DIRECTORY_ID) + ) + .setLastOffsetEpoch(lastEpoch) + .setLastOffset(lastEpochEndOffset) + .setPreVote(preVote) + ) ) - ); + ) + ); } public static VoteResponseData singletonVoteResponse( - ListenerName listenerName, - short apiVersion, - Errors topLevelError, - TopicPartition topicPartition, - Errors partitionLevelError, - int leaderEpoch, - int leaderId, - boolean voteGranted, - Endpoints endpoints + ListenerName listenerName, + short apiVersion, + Errors topLevelError, + TopicPartition topicPartition, + Errors partitionLevelError, + int leaderEpoch, + int leaderId, + boolean voteGranted, + Endpoints endpoints ) { VoteResponseData response = new VoteResponseData() - .setErrorCode(topLevelError.code()) - .setTopics(Collections.singletonList( - new VoteResponseData.TopicData() - .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList( - new VoteResponseData.PartitionData() - .setErrorCode(partitionLevelError.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - .setVoteGranted(voteGranted))))); + .setErrorCode(topLevelError.code()) + .setTopics(Collections.singletonList( + new VoteResponseData.TopicData() + .setTopicName(topicPartition.topic()) + .setPartitions(Collections.singletonList( + new VoteResponseData.PartitionData() + .setErrorCode(partitionLevelError.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setVoteGranted(voteGranted))))); if (apiVersion >= 1) { Optional address = endpoints.address(listenerName); @@ -100,10 +100,10 @@ public static VoteResponseData singletonVoteResponse( // Populate the node endpoints VoteResponseData.NodeEndpointCollection nodeEndpoints = new VoteResponseData.NodeEndpointCollection(1); nodeEndpoints.add( - new VoteResponseData.NodeEndpoint() - .setNodeId(leaderId) - .setHost(address.get().getHostString()) - .setPort(address.get().getPort()) + new VoteResponseData.NodeEndpoint() + .setNodeId(leaderId) + .setHost(address.get().getHostString()) + .setPort(address.get().getPort()) ); response.setNodeEndpoints(nodeEndpoints); } @@ -113,8 +113,8 @@ public static VoteResponseData singletonVoteResponse( } public static Optional voteRequestVoterKey( - VoteRequestData request, - VoteRequestData.PartitionData partition + VoteRequestData request, + VoteRequestData.PartitionData partition ) { if (request.voterId() < 0) { return Optional.empty(); @@ -125,15 +125,15 @@ public static Optional voteRequestVoterKey( public static boolean hasValidTopicPartition(VoteResponseData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } public static boolean hasValidTopicPartition(VoteRequestData data, TopicPartition topicPartition) { return data.topics().size() == 1 && - data.topics().get(0).topicName().equals(topicPartition.topic()) && - data.topics().get(0).partitions().size() == 1 && - data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); + data.topics().get(0).topicName().equals(topicPartition.topic()) && + data.topics().get(0).partitions().size() == 1 && + data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } }