Skip to content

Commit da46cf6

Browse files
KAFKA-17565 Move MetadataCache interface to metadata module (apache#18801)
### Changes * Move MetadataCache interface to metadata module and change Scala function to Java. * Remove functions `getTopicPartitions`, `getAliveBrokers`, `topicNamesToIds`, `topicIdInfo`, and `getClusterMetadata` from MetadataCache interface, because these functions are only used in test code. ### Performance * ReplicaFetcherThreadBenchmark ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark ``` * trunk ``` Benchmark (partitionCount) Mode Cnt Score Error Units ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2 4775.490 ns/op ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2 25730.790 ns/op ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2 55334.206 ns/op ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2 488427.547 ns/op ``` * branch ``` Benchmark (partitionCount) Mode Cnt Score Error Units ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 2 4825.219 ns/op ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 2 25985.662 ns/op ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 2 56056.005 ns/op ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 2 497138.573 ns/op ``` * KRaftMetadataRequestBenchmark ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.metadata.KRaftMetadataRequestBenchmark ``` * trunk ``` Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 500 avgt 2 884933.558 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 1000 avgt 2 1910054.621 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 5000 avgt 2 21778869.337 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 500 avgt 2 1537550.670 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 1000 avgt 2 3168237.805 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 5000 avgt 2 29699652.466 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 500 avgt 2 3501483.852 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 1000 avgt 2 7405481.182 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 5000 avgt 2 55839670.124 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 500 avgt 2 333.667 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 1000 avgt 2 339.685 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 5000 avgt 2 334.293 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 500 avgt 2 329.899 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 1000 avgt 2 347.537 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 5000 avgt 2 332.781 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 500 avgt 2 327.085 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 1000 avgt 2 325.206 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 5000 avgt 2 316.758 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 500 avgt 2 7.569 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 1000 avgt 2 7.565 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 5000 avgt 2 7.574 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 500 avgt 2 7.568 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 1000 avgt 2 7.557 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 5000 avgt 2 7.585 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 500 avgt 2 7.560 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 1000 avgt 2 7.554 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 5000 avgt 2 7.574 ns/op ``` * branch ``` Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 500 avgt 2 910337.770 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 1000 avgt 2 1902351.360 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 10 5000 avgt 2 22215893.338 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 500 avgt 2 1572683.875 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 1000 avgt 2 3188560.081 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 20 5000 avgt 2 29984751.632 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 500 avgt 2 3413567.549 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 1000 avgt 2 7303174.254 ns/op KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics 50 5000 avgt 2 54293721.640 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 500 avgt 2 318.335 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 1000 avgt 2 331.386 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 10 5000 avgt 2 332.944 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 500 avgt 2 340.322 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 1000 avgt 2 330.294 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 20 5000 avgt 2 342.154 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 500 avgt 2 341.053 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 1000 avgt 2 335.458 ns/op KRaftMetadataRequestBenchmark.testRequestToJson 50 5000 avgt 2 322.050 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 500 avgt 2 7.538 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 1000 avgt 2 7.548 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 10 5000 avgt 2 7.545 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 500 avgt 2 7.597 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 1000 avgt 2 7.567 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 20 5000 avgt 2 7.558 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 500 avgt 2 7.559 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 1000 avgt 2 7.615 ns/op KRaftMetadataRequestBenchmark.testTopicIdInfo 50 5000 avgt 2 7.562 ns/op ``` * PartitionMakeFollowerBenchmark ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.partition.PartitionMakeFollowerBenchmark ``` * trunk ``` Benchmark Mode Cnt Score Error Units PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 158.816 ns/op ``` * branch ``` Benchmark Mode Cnt Score Error Units PartitionMakeFollowerBenchmark.testMakeFollower avgt 2 160.533 ns/op ``` * UpdateFollowerFetchStateBenchmark ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.partition.UpdateFollowerFetchStateBenchmark ``` * trunk ``` Benchmark Mode Cnt Score Error Units UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench avgt 2 4975.261 ns/op UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange avgt 2 4880.880 ns/op ``` * branch ``` Benchmark Mode Cnt Score Error Units UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBench avgt 2 5020.722 ns/op UpdateFollowerFetchStateBenchmark.updateFollowerFetchStateBenchNoChange avgt 2 4878.855 ns/op ``` * CheckpointBench ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.server.CheckpointBench ``` * trunk ``` Benchmark (numPartitions) (numTopics) Mode Cnt Score Error Units CheckpointBench.measureCheckpointHighWatermarks 3 100 thrpt 2 0.997 ops/ms CheckpointBench.measureCheckpointHighWatermarks 3 1000 thrpt 2 0.703 ops/ms CheckpointBench.measureCheckpointHighWatermarks 3 2000 thrpt 2 0.486 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 100 thrpt 2 1.038 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 1000 thrpt 2 0.734 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 2000 thrpt 2 0.637 ops/ms ``` * branch ``` Benchmark (numPartitions) (numTopics) Mode Cnt Score Error Units CheckpointBench.measureCheckpointHighWatermarks 3 100 thrpt 2 0.990 ops/ms CheckpointBench.measureCheckpointHighWatermarks 3 1000 thrpt 2 0.659 ops/ms CheckpointBench.measureCheckpointHighWatermarks 3 2000 thrpt 2 0.508 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 100 thrpt 2 0.923 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 1000 thrpt 2 0.736 ops/ms CheckpointBench.measureCheckpointLogStartOffsets 3 2000 thrpt 2 0.637 ops/ms ``` * PartitionCreationBench ``` ./jmh-benchmarks/jmh.sh -f 1 -i 2 -wi 2 org.apache.kafka.jmh.server.PartitionCreationBench ``` * trunk ``` Benchmark (numPartitions) (useTopicIds) Mode Cnt Score Error Units PartitionCreationBench.makeFollower 20 false avgt 2 5.997 ms/op PartitionCreationBench.makeFollower 20 true avgt 2 6.961 ms/op ``` * branch ``` Benchmark (numPartitions) (useTopicIds) Mode Cnt Score Error Units PartitionCreationBench.makeFollower 20 false avgt 2 6.212 ms/op PartitionCreationBench.makeFollower 20 true avgt 2 7.005 ms/op ``` Reviewers: Ismael Juma <[email protected]>, David Arthur <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent a7e40b7 commit da46cf6

File tree

67 files changed

+655
-740
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+655
-740
lines changed

checkstyle/import-control-core.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
</subpackage>
5656

5757
<subpackage name="coordinator">
58-
<allow class="kafka.server.MetadataCache" />
58+
<allow class="org.apache.kafka.metadata.MetadataCache" />
5959
</subpackage>
6060

6161
<subpackage name="docker">

checkstyle/import-control-metadata.xml

+3
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,15 @@
144144
</subpackage>
145145

146146
<subpackage name="metadata">
147+
<allow pkg="org.apache.kafka.admin" />
147148
<allow pkg="org.apache.kafka.clients" />
148149
<allow pkg="org.apache.kafka.common.acl" />
149150
<allow pkg="org.apache.kafka.common.annotation" />
150151
<allow pkg="org.apache.kafka.common.config" />
152+
<allow pkg="org.apache.kafka.common.internals" />
151153
<allow pkg="org.apache.kafka.common.message" />
152154
<allow pkg="org.apache.kafka.common.metadata" />
155+
<allow pkg="org.apache.kafka.common.network" />
153156
<allow pkg="org.apache.kafka.common.protocol" />
154157
<allow pkg="org.apache.kafka.common.quota" />
155158
<allow pkg="org.apache.kafka.common.record" />

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import kafka.server.ForwardingManager;
2727
import kafka.server.KafkaApis;
2828
import kafka.server.KafkaConfig;
29-
import kafka.server.MetadataCache;
3029
import kafka.server.QuotaFactory.QuotaManagers;
3130
import kafka.server.ReplicaManager;
3231
import kafka.server.share.SharePartitionManager;
@@ -36,6 +35,7 @@
3635
import org.apache.kafka.coordinator.group.GroupCoordinator;
3736
import org.apache.kafka.coordinator.share.ShareCoordinator;
3837
import org.apache.kafka.metadata.ConfigRepository;
38+
import org.apache.kafka.metadata.MetadataCache;
3939
import org.apache.kafka.server.ClientMetricsManager;
4040
import org.apache.kafka.server.authorizer.Authorizer;
4141
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;

core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import kafka.log.LogManager;
2121
import kafka.server.AlterPartitionManager;
2222
import kafka.server.KafkaConfig;
23-
import kafka.server.MetadataCache;
2423
import kafka.server.QuotaFactory.QuotaManagers;
2524
import kafka.server.ReplicaManager;
2625

2726
import org.apache.kafka.common.metrics.Metrics;
2827
import org.apache.kafka.common.utils.Time;
28+
import org.apache.kafka.metadata.MetadataCache;
2929
import org.apache.kafka.server.DelayedActionQueue;
3030
import org.apache.kafka.server.common.DirectoryEventHandler;
3131
import org.apache.kafka.server.util.Scheduler;

core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import kafka.network.RequestChannel;
2121
import kafka.server.AuthHelper;
2222
import kafka.server.KafkaConfig;
23-
import kafka.server.MetadataCache;
2423

2524
import org.apache.kafka.common.Uuid;
2625
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -31,15 +30,14 @@
3130
import org.apache.kafka.common.protocol.Errors;
3231
import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
3332
import org.apache.kafka.common.resource.Resource;
33+
import org.apache.kafka.metadata.MetadataCache;
3434

3535
import java.util.Collections;
3636
import java.util.HashSet;
3737
import java.util.List;
3838
import java.util.Set;
3939
import java.util.stream.Stream;
4040

41-
import scala.jdk.javaapi.CollectionConverters;
42-
4341
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
4442
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
4543

@@ -65,7 +63,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
6563
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
6664
String cursorTopicName = cursor != null ? cursor.topicName() : "";
6765
if (fetchAllTopics) {
68-
CollectionConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName -> {
66+
metadataCache.getAllTopics().forEach(topicName -> {
6967
if (topicName.compareTo(cursorTopicName) >= 0) {
7068
topics.add(topicName);
7169
}
@@ -105,7 +103,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
105103
});
106104

107105
DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
108-
CollectionConverters.asScala(authorizedTopicsStream.iterator()),
106+
authorizedTopicsStream.iterator(),
109107
abstractRequest.context().listenerName,
110108
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
111109
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),

core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package kafka.server.share;
1919

20-
import kafka.server.MetadataCache;
21-
2220
import org.apache.kafka.common.Node;
2321
import org.apache.kafka.common.message.MetadataResponseData;
2422
import org.apache.kafka.common.network.ListenerName;
2523
import org.apache.kafka.common.protocol.Errors;
2624
import org.apache.kafka.common.requests.MetadataResponse;
25+
import org.apache.kafka.metadata.MetadataCache;
2726
import org.apache.kafka.server.share.SharePartitionKey;
2827
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
2928

@@ -37,9 +36,6 @@
3736
import java.util.Set;
3837
import java.util.function.Function;
3938

40-
import scala.jdk.javaapi.CollectionConverters;
41-
import scala.jdk.javaapi.OptionConverters;
42-
4339
public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinatorMetadataCacheHelper {
4440
private final MetadataCache metadataCache;
4541
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
@@ -73,13 +69,11 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
7369
Set<String> topicSet = new HashSet<>();
7470
topicSet.add(internalTopicName);
7571

76-
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
77-
metadataCache.getTopicMetadata(
78-
CollectionConverters.asScala(topicSet),
79-
interBrokerListenerName,
80-
false,
81-
false
82-
)
72+
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = metadataCache.getTopicMetadata(
73+
topicSet,
74+
interBrokerListenerName,
75+
false,
76+
false
8377
);
8478

8579
if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
@@ -92,7 +86,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
9286
.findFirst();
9387

9488
if (response.isPresent()) {
95-
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
89+
return metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName)
9690
.orElse(Node.noNode());
9791
} else {
9892
return Node.noNode();
@@ -108,7 +102,7 @@ public Node getShareCoordinator(SharePartitionKey key, String internalTopicName)
108102
@Override
109103
public List<Node> getClusterNodes() {
110104
try {
111-
return CollectionConverters.asJava(metadataCache.getAliveBrokerNodes(interBrokerListenerName).toSeq());
105+
return metadataCache.getAliveBrokerNodes(interBrokerListenerName);
112106
} catch (Exception e) {
113107
log.warn("Exception while getting cluster nodes", e);
114108
}

core/src/main/scala/kafka/cluster/Partition.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
3838
import org.apache.kafka.common.requests._
3939
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
4040
import org.apache.kafka.common.utils.Time
41-
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
41+
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
4242
import org.apache.kafka.server.common.RequestLocal
4343
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4444
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@@ -1070,9 +1070,9 @@ class Partition(val topicPartition: TopicPartition,
10701070
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
10711071
}
10721072

1073-
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Option[Long]): Boolean = {
1074-
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isDefined &&
1075-
(storedBrokerEpoch.get == -1 || storedBrokerEpoch == cachedBrokerEpoch)
1073+
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
1074+
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isPresent() &&
1075+
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get())
10761076
}
10771077

10781078
/*

core/src/main/scala/kafka/cluster/Replica.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package kafka.cluster
1919

20-
import kafka.server.MetadataCache
2120
import kafka.utils.Logging
2221
import org.apache.kafka.common.TopicPartition
2322
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
23+
import org.apache.kafka.metadata.MetadataCache
2424
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
2525

2626
import java.util.concurrent.atomic.AtomicReference
@@ -113,7 +113,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadat
113113
replicaState.updateAndGet { currentReplicaState =>
114114
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
115115
// Fence the update if it provides a stale broker epoch.
116-
if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {
116+
if (brokerEpoch != -1 && cachedBrokerEpoch.filter(_ > brokerEpoch).isPresent()) {
117117
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
118118
s"vs expected=${currentReplicaState.brokerEpoch.get}")
119119
}

core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package kafka.coordinator.transaction
1818

19-
import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
19+
import kafka.server.{KafkaConfig, ReplicaManager}
2020
import kafka.utils.Logging
2121
import org.apache.kafka.common.TopicPartition
2222
import org.apache.kafka.common.internals.Topic
@@ -28,6 +28,7 @@ import org.apache.kafka.common.record.RecordBatch
2828
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
2929
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
3030
import org.apache.kafka.coordinator.transaction.ProducerIdManager
31+
import org.apache.kafka.metadata.MetadataCache
3132
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
3233
import org.apache.kafka.server.util.Scheduler
3334

core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import kafka.coordinator.transaction.TransactionMarkerChannelManager.{LogAppendR
2121

2222
import java.util
2323
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
24-
import kafka.server.{KafkaConfig, MetadataCache}
24+
import kafka.server.KafkaConfig
2525
import kafka.utils.Logging
2626
import org.apache.kafka.clients._
2727
import org.apache.kafka.common.metrics.Metrics
@@ -32,12 +32,14 @@ import org.apache.kafka.common.requests.{TransactionResult, WriteTxnMarkersReque
3232
import org.apache.kafka.common.security.JaasContext
3333
import org.apache.kafka.common.utils.{LogContext, Time}
3434
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
35+
import org.apache.kafka.metadata.MetadataCache
3536
import org.apache.kafka.server.common.RequestLocal
3637
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3738
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
3839

3940
import scala.collection.{concurrent, immutable}
4041
import scala.jdk.CollectionConverters._
42+
import scala.jdk.javaapi.OptionConverters
4143

4244
object TransactionMarkerChannelManager {
4345
private val UnknownDestinationQueueSizeMetricName = "UnknownDestinationQueueSize"
@@ -382,7 +384,7 @@ class TransactionMarkerChannelManager(
382384
topicPartitions: immutable.Set[TopicPartition]): Unit = {
383385
val txnTopicPartition = txnStateManager.partitionFor(pendingCompleteTxn.transactionalId)
384386
val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
385-
metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
387+
OptionConverters.toScala(metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName))
386388
}
387389

388390
val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch

core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer
2020
import java.util.Properties
2121
import java.util.concurrent.atomic.AtomicBoolean
2222
import java.util.concurrent.locks.ReentrantReadWriteLock
23-
import kafka.server.{MetadataCache, ReplicaManager}
23+
import kafka.server.ReplicaManager
2424
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
2525
import kafka.utils.{Logging, Pool}
2626
import org.apache.kafka.common.config.TopicConfig
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.TransactionResult
3535
import org.apache.kafka.common.utils.{Time, Utils}
3636
import org.apache.kafka.common.{KafkaException, TopicPartition}
3737
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
38+
import org.apache.kafka.metadata.MetadataCache
3839
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
3940
import org.apache.kafka.server.config.ServerConfigs
4041
import org.apache.kafka.server.record.BrokerCompressionType

core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartiti
2626
import org.apache.kafka.common.protocol.Errors
2727
import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, MetadataResponse}
2828
import org.apache.kafka.common.utils.Time
29+
import org.apache.kafka.metadata.MetadataCache
2930
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3031
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
3132

@@ -185,7 +186,7 @@ class AddPartitionsToTxnManager(
185186
}
186187
}
187188

188-
private def getTransactionCoordinator(partition: Int): Option[Node] = {
189+
private def getTransactionCoordinator(partition: Int): util.Optional[Node] = {
189190
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
190191
.filter(_.leader != MetadataResponse.NO_LEADER_ID)
191192
.flatMap(metadata => metadataCache.getAliveBrokerNode(metadata.leader, interBrokerListenerName))

core/src/main/scala/kafka/server/ApiVersionManager.scala

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange
2020
import org.apache.kafka.common.message.ApiMessageType.ListenerType
2121
import org.apache.kafka.common.protocol.ApiKeys
2222
import org.apache.kafka.common.requests.ApiVersionsResponse
23+
import org.apache.kafka.metadata.MetadataCache
2324
import org.apache.kafka.network.metrics.RequestChannelMetrics
2425
import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
2526
import org.apache.kafka.server.common.FinalizedFeatures

core/src/main/scala/kafka/server/BrokerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class BrokerServer(
207207

208208
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
209209

210-
metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
210+
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
211211

212212
// Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
213213
// until we catch up on the metadata log and have up-to-date topic and broker configs.

core/src/main/scala/kafka/server/ConfigHelper.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
3333
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
3434
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
3535
import org.apache.kafka.coordinator.group.GroupConfig
36-
import org.apache.kafka.metadata.ConfigRepository
36+
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
3737
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
3838
import org.apache.kafka.server.metrics.ClientMetricsConfigs
3939
import org.apache.kafka.storage.internals.log.LogConfig

core/src/main/scala/kafka/server/ControllerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class ControllerServer(
140140
authorizer = config.createNewAuthorizer()
141141
authorizer.foreach(_.configure(config.originals))
142142

143-
metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
143+
metadataCache = new KRaftMetadataCache(config.nodeId, () => raftManager.client.kraftVersion())
144144

145145
metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
146146

core/src/main/scala/kafka/server/DelayedElectLeader.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class DelayedElectLeader(
7474
private def updateWaiting(): Unit = {
7575
val metadataCache = replicaManager.metadataCache
7676
val completedPartitions = waitingPartitions.collect {
77-
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).exists(_.leader == leader) => tp
77+
case (tp, leader) if metadataCache.getLeaderAndIsr(tp.topic, tp.partition).filter(_.leader == leader).isPresent() => tp
7878
}
7979
completedPartitions.foreach { tp =>
8080
waitingPartitions -= tp

0 commit comments

Comments
 (0)