31
31
32
32
public class DescribeQuorumRpc {
33
33
public static DescribeQuorumRequestData singletonDescribeQuorumRequest (
34
- TopicPartition topicPartition
34
+ TopicPartition topicPartition
35
35
) {
36
36
return new DescribeQuorumRequestData ()
37
- .setTopics (
38
- Collections .singletonList (
39
- new DescribeQuorumRequestData .TopicData ()
40
- .setTopicName (topicPartition .topic ())
41
- .setPartitions (
42
- Collections .singletonList (
43
- new DescribeQuorumRequestData .PartitionData ()
44
- .setPartitionIndex (topicPartition .partition ())
45
- )
46
- )
37
+ .setTopics (
38
+ Collections .singletonList (
39
+ new DescribeQuorumRequestData .TopicData ()
40
+ .setTopicName (topicPartition .topic ())
41
+ .setPartitions (
42
+ Collections .singletonList (
43
+ new DescribeQuorumRequestData .PartitionData ()
44
+ .setPartitionIndex (topicPartition .partition ())
45
+ )
47
46
)
48
- );
47
+ )
48
+ );
49
49
}
50
50
51
51
public static DescribeQuorumResponseData singletonDescribeQuorumResponse (
52
- short apiVersion ,
53
- TopicPartition topicPartition ,
54
- int leaderId ,
55
- int leaderEpoch ,
56
- long highWatermark ,
57
- Collection <LeaderState .ReplicaState > voters ,
58
- Collection <LeaderState .ReplicaState > observers ,
59
- long currentTimeMs
52
+ short apiVersion ,
53
+ TopicPartition topicPartition ,
54
+ int leaderId ,
55
+ int leaderEpoch ,
56
+ long highWatermark ,
57
+ Collection <LeaderState .ReplicaState > voters ,
58
+ Collection <LeaderState .ReplicaState > observers ,
59
+ long currentTimeMs
60
60
) {
61
61
DescribeQuorumResponseData response = new DescribeQuorumResponseData ()
62
- .setTopics (
63
- Collections .singletonList (
64
- new DescribeQuorumResponseData .TopicData ()
65
- .setTopicName (topicPartition .topic ())
66
- .setPartitions (
67
- Collections .singletonList (
68
- new DescribeQuorumResponseData .PartitionData ()
69
- .setPartitionIndex (topicPartition .partition ())
70
- .setErrorCode (Errors .NONE .code ())
71
- .setLeaderId (leaderId )
72
- .setLeaderEpoch (leaderEpoch )
73
- .setHighWatermark (highWatermark )
74
- .setCurrentVoters (toReplicaStates (apiVersion , leaderId , voters , currentTimeMs ))
75
- .setObservers (toReplicaStates (apiVersion , leaderId , observers , currentTimeMs ))))));
62
+ .setTopics (
63
+ Collections .singletonList (
64
+ new DescribeQuorumResponseData .TopicData ()
65
+ .setTopicName (topicPartition .topic ())
66
+ .setPartitions (
67
+ Collections .singletonList (
68
+ new DescribeQuorumResponseData .PartitionData ()
69
+ .setPartitionIndex (topicPartition .partition ())
70
+ .setErrorCode (Errors .NONE .code ())
71
+ .setLeaderId (leaderId )
72
+ .setLeaderEpoch (leaderEpoch )
73
+ .setHighWatermark (highWatermark )
74
+ .setCurrentVoters (toReplicaStates (apiVersion , leaderId , voters , currentTimeMs ))
75
+ .setObservers (toReplicaStates (apiVersion , leaderId , observers , currentTimeMs ))))));
76
76
if (apiVersion >= 2 ) {
77
77
DescribeQuorumResponseData .NodeCollection nodes = new DescribeQuorumResponseData .NodeCollection (voters .size ());
78
78
for (LeaderState .ReplicaState voter : voters ) {
79
79
nodes .add (
80
- new DescribeQuorumResponseData .Node ()
81
- .setNodeId (voter .replicaKey ().id ())
82
- .setListeners (voter .listeners ().toDescribeQuorumResponseListeners ())
80
+ new DescribeQuorumResponseData .Node ()
81
+ .setNodeId (voter .replicaKey ().id ())
82
+ .setListeners (voter .listeners ().toDescribeQuorumResponseListeners ())
83
83
);
84
84
}
85
85
response .setNodes (nodes );
@@ -88,22 +88,22 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
88
88
}
89
89
90
90
private static List <DescribeQuorumResponseData .ReplicaState > toReplicaStates (
91
- short apiVersion ,
92
- int leaderId ,
93
- Collection <LeaderState .ReplicaState > states ,
94
- long currentTimeMs
91
+ short apiVersion ,
92
+ int leaderId ,
93
+ Collection <LeaderState .ReplicaState > states ,
94
+ long currentTimeMs
95
95
) {
96
96
return states
97
- .stream ()
98
- .map (replicaState -> toReplicaState (apiVersion , leaderId , replicaState , currentTimeMs ))
99
- .collect (Collectors .toList ());
97
+ .stream ()
98
+ .map (replicaState -> toReplicaState (apiVersion , leaderId , replicaState , currentTimeMs ))
99
+ .collect (Collectors .toList ());
100
100
}
101
101
102
102
private static DescribeQuorumResponseData .ReplicaState toReplicaState (
103
- short apiVersion ,
104
- int leaderId ,
105
- LeaderState .ReplicaState replicaState ,
106
- long currentTimeMs
103
+ short apiVersion ,
104
+ int leaderId ,
105
+ LeaderState .ReplicaState replicaState ,
106
+ long currentTimeMs
107
107
) {
108
108
final long lastCaughtUpTimestamp ;
109
109
final long lastFetchTimestamp ;
@@ -115,10 +115,10 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState(
115
115
lastFetchTimestamp = replicaState .lastFetchTimestamp ();
116
116
}
117
117
DescribeQuorumResponseData .ReplicaState replicaStateData = new DescribeQuorumResponseData .ReplicaState ()
118
- .setReplicaId (replicaState .replicaKey ().id ())
119
- .setLogEndOffset (replicaState .endOffset ().map (LogOffsetMetadata ::offset ).orElse (-1L ))
120
- .setLastCaughtUpTimestamp (lastCaughtUpTimestamp )
121
- .setLastFetchTimestamp (lastFetchTimestamp );
118
+ .setReplicaId (replicaState .replicaKey ().id ())
119
+ .setLogEndOffset (replicaState .endOffset ().map (LogOffsetMetadata ::offset ).orElse (-1L ))
120
+ .setLastCaughtUpTimestamp (lastCaughtUpTimestamp )
121
+ .setLastFetchTimestamp (lastFetchTimestamp );
122
122
123
123
if (apiVersion >= 2 ) {
124
124
replicaStateData .setReplicaDirectoryId (replicaState .replicaKey ().directoryId ().orElse (ReplicaKey .NO_DIRECTORY_ID ));
@@ -128,8 +128,8 @@ private static DescribeQuorumResponseData.ReplicaState toReplicaState(
128
128
129
129
public static boolean hasValidTopicPartition (DescribeQuorumRequestData data , TopicPartition topicPartition ) {
130
130
return data .topics ().size () == 1 &&
131
- data .topics ().get (0 ).topicName ().equals (topicPartition .topic ()) &&
132
- data .topics ().get (0 ).partitions ().size () == 1 &&
133
- data .topics ().get (0 ).partitions ().get (0 ).partitionIndex () == topicPartition .partition ();
131
+ data .topics ().get (0 ).topicName ().equals (topicPartition .topic ()) &&
132
+ data .topics ().get (0 ).partitions ().size () == 1 &&
133
+ data .topics ().get (0 ).partitions ().get (0 ).partitionIndex () == topicPartition .partition ();
134
134
}
135
135
}
0 commit comments