Skip to content

Commit

Permalink
KAFKA-16907: reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Jan 14, 2025
1 parent 5acb0bc commit 7b2740b
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 366 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,72 +31,72 @@

public class BeginQuorumEpochRpc {
public static BeginQuorumEpochRequestData singletonBeginQuorumEpochRequest(
TopicPartition topicPartition,
String clusterId,
int leaderEpoch,
int leaderId,
Endpoints leaderEndpoints,
ReplicaKey voterKey
TopicPartition topicPartition,
String clusterId,
int leaderEpoch,
int leaderId,
Endpoints leaderEndpoints,
ReplicaKey voterKey
) {
return new BeginQuorumEpochRequestData()
.setClusterId(clusterId)
.setVoterId(voterKey.id())
.setTopics(
Collections.singletonList(
new BeginQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new BeginQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
.setLeaderId(leaderId)
.setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
)
)
.setClusterId(clusterId)
.setVoterId(voterKey.id())
.setTopics(
Collections.singletonList(
new BeginQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new BeginQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
.setLeaderId(leaderId)
.setVoterDirectoryId(voterKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
)
)
)
.setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest());
)
.setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest());
}

public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse(
ListenerName listenerName,
short apiVersion,
Errors topLevelError,
TopicPartition topicPartition,
Errors partitionLevelError,
int leaderEpoch,
int leaderId,
Endpoints endpoints
ListenerName listenerName,
short apiVersion,
Errors topLevelError,
TopicPartition topicPartition,
Errors partitionLevelError,
int leaderEpoch,
int leaderId,
Endpoints endpoints
) {
BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData()
.setErrorCode(topLevelError.code())
.setTopics(
Collections.singletonList(
new BeginQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new BeginQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
)
)
.setErrorCode(topLevelError.code())
.setTopics(
Collections.singletonList(
new BeginQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new BeginQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
)
)
);
)
);

if (apiVersion >= 1) {
Optional<InetSocketAddress> address = endpoints.address(listenerName);
if (address.isPresent() && leaderId >= 0) {
// Populate the node endpoints
BeginQuorumEpochResponseData.NodeEndpointCollection nodeEndpoints =
new BeginQuorumEpochResponseData.NodeEndpointCollection(1);
new BeginQuorumEpochResponseData.NodeEndpointCollection(1);
nodeEndpoints.add(
new BeginQuorumEpochResponseData.NodeEndpoint()
.setNodeId(leaderId)
.setHost(address.get().getHostString())
.setPort(address.get().getPort())
new BeginQuorumEpochResponseData.NodeEndpoint()
.setNodeId(leaderId)
.setHost(address.get().getHostString())
.setPort(address.get().getPort())
);
response.setNodeEndpoints(nodeEndpoints);
}
Expand All @@ -107,21 +107,21 @@ public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse(

public static boolean hasValidTopicPartition(BeginQuorumEpochRequestData data, TopicPartition topicPartition) {
return data.topics().size() == 1 &&
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}

public static boolean hasValidTopicPartition(BeginQuorumEpochResponseData data, TopicPartition topicPartition) {
return data.topics().size() == 1 &&
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}

public static Optional<ReplicaKey> beginQuorumEpochRequestVoterKey(
BeginQuorumEpochRequestData request,
BeginQuorumEpochRequestData.PartitionData partition
BeginQuorumEpochRequestData request,
BeginQuorumEpochRequestData.PartitionData partition
) {
if (request.voterId() < 0) {
return Optional.empty();
Expand Down
110 changes: 55 additions & 55 deletions raft/src/main/java/org/apache/kafka/raft/utils/DescribeQuorumRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,55 +31,55 @@

public class DescribeQuorumRpc {
public static DescribeQuorumRequestData singletonDescribeQuorumRequest(
TopicPartition topicPartition
TopicPartition topicPartition
) {
return new DescribeQuorumRequestData()
.setTopics(
Collections.singletonList(
new DescribeQuorumRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new DescribeQuorumRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
)
)
.setTopics(
Collections.singletonList(
new DescribeQuorumRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new DescribeQuorumRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
)
)
);
)
);
}

public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
short apiVersion,
TopicPartition topicPartition,
int leaderId,
int leaderEpoch,
long highWatermark,
Collection<LeaderState.ReplicaState> voters,
Collection<LeaderState.ReplicaState> observers,
long currentTimeMs
short apiVersion,
TopicPartition topicPartition,
int leaderId,
int leaderEpoch,
long highWatermark,
Collection<LeaderState.ReplicaState> voters,
Collection<LeaderState.ReplicaState> observers,
long currentTimeMs
) {
DescribeQuorumResponseData response = new DescribeQuorumResponseData()
.setTopics(
Collections.singletonList(
new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs))
.setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs))))));
.setTopics(
Collections.singletonList(
new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs))
.setObservers(toReplicaStates(apiVersion, leaderId, observers, currentTimeMs))))));
if (apiVersion >= 2) {
DescribeQuorumResponseData.NodeCollection nodes = new DescribeQuorumResponseData.NodeCollection(voters.size());
for (LeaderState.ReplicaState voter : voters) {
nodes.add(
new DescribeQuorumResponseData.Node()
.setNodeId(voter.replicaKey().id())
.setListeners(voter.listeners().toDescribeQuorumResponseListeners())
new DescribeQuorumResponseData.Node()
.setNodeId(voter.replicaKey().id())
.setListeners(voter.listeners().toDescribeQuorumResponseListeners())
);
}
response.setNodes(nodes);
Expand All @@ -88,22 +88,22 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
}

private static List<DescribeQuorumResponseData.ReplicaState> toReplicaStates(
short apiVersion,
int leaderId,
Collection<LeaderState.ReplicaState> states,
long currentTimeMs
short apiVersion,
int leaderId,
Collection<LeaderState.ReplicaState> states,
long currentTimeMs
) {
return states
.stream()
.map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs))
.collect(Collectors.toList());
.stream()
.map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs))
.collect(Collectors.toList());
}

private static DescribeQuorumResponseData.ReplicaState toReplicaState(
short apiVersion,
int leaderId,
LeaderState.ReplicaState replicaState,
long currentTimeMs
short apiVersion,
int leaderId,
LeaderState.ReplicaState replicaState,
long currentTimeMs
) {
final long lastCaughtUpTimestamp;
final long lastFetchTimestamp;
Expand All @@ -115,10 +115,10 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState(
lastFetchTimestamp = replicaState.lastFetchTimestamp();
}
DescribeQuorumResponseData.ReplicaState replicaStateData = new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(replicaState.replicaKey().id())
.setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L))
.setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
.setLastFetchTimestamp(lastFetchTimestamp);
.setReplicaId(replicaState.replicaKey().id())
.setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L))
.setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
.setLastFetchTimestamp(lastFetchTimestamp);

if (apiVersion >= 2) {
replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
Expand All @@ -128,8 +128,8 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState(

public static boolean hasValidTopicPartition(DescribeQuorumRequestData data, TopicPartition topicPartition) {
return data.topics().size() == 1 &&
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
data.topics().get(0).topicName().equals(topicPartition.topic()) &&
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}
}
Loading

0 comments on commit 7b2740b

Please sign in to comment.