Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ZooKeeper #2244

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ Control). The metrics reporter periodically samples the Kafka raw metrics on the
* If the default broker cleanup policy is `compact`, make sure that the topic to which Cruise Control metrics
reporter should send messages is created with the `delete` cleanup policy -- the default metrics reporter topic
is `__CruiseControlMetrics`.
2. Start ZooKeeper and Kafka server ([See tutorial](https://kafka.apache.org/quickstart)).
2. Start Kafka server ([See tutorial](https://kafka.apache.org/quickstart)).
3. Modify `config/cruisecontrol.properties` of Cruise Control:
* (Required) fill in `bootstrap.servers` and `zookeeper.connect` to the Kafka cluster to be monitored.
* (Required) fill in `bootstrap.servers` to the Kafka cluster to be monitored.
* (Required) update `capacity.config.file` to the path of your capacity file.
* Capacity file is a JSON file that provides the capacity of the brokers
* You can start Cruise Control server with the default file (`config/capacityJBOD.json`), but it may not reflect the actual capacity of the brokers
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ project(':cruise-control') {
api project(':cruise-control-core')
implementation "org.slf4j:slf4j-api:1.7.36"
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2"
implementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
implementation "io.netty:netty-handler:${nettyVersion}"
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
Expand Down Expand Up @@ -461,9 +460,10 @@ project(':cruise-control-metrics-reporter') {
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-metadata:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
testOutput sourceSets.test.output
}

Expand Down
14 changes: 1 addition & 13 deletions config/cruise_control_jaas.conf_template
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
//Rename this file to cruise_control_jaas.conf when using secured zookeepers
//For detailed instructions, see /docs/wiki/User Guide/Secure-zookeeper-configuration.md

//Enter appropriate Client entry for secured zookeeper client connections
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path/to/zookeeper_client.keytab"
storeKey=true
useTicketCache=false
principal="zookeeper_client@<REALM>";
};

//Rename this file to cruise_control_jaas.conf if security is enabled
//Enter appropriate KafkaClient entry if using the SASL protocol, remove if not
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
Expand Down
9 changes: 0 additions & 9 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ num.proposal.precompute.threads=1
# Configurations for the executor
# =======================================

# The zookeeper connect of the Kafka cluster
zookeeper.connect=localhost:2181/

# If true, appropriate zookeeper Client { .. } entry required in jaas file located at $base_dir/config/cruise_control_jaas.conf
zookeeper.security.enabled=false

# The max number of partitions to move in/out on a given broker at a given time.
num.concurrent.partition.movements.per.broker=10

Expand Down Expand Up @@ -222,9 +216,6 @@ self.healing.exclude.recently.demoted.brokers=true
# True if recently removed brokers are excluded from optimizations during self healing, false otherwise
self.healing.exclude.recently.removed.brokers=true

# The zk path to store failed broker information.
failed.brokers.zk.path=/CruiseControlBrokerList

# Topic config provider class
topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider

Expand Down
7 changes: 0 additions & 7 deletions cruise-control-core/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,5 @@ logger.orgApacheKafka.level=error
logger.kafka.name=kafka
logger.kafka.level=error

# zkclient can be verbose, during debugging it is common to adjust is separately
logger.zkclient.name=kafka.zk.KafkaZkClient
logger.zkclient.level=warn

logger.zookeeper.name=org.apache.zookeeper
logger.zookeeper.level=warn

rootLogger.appenderRefs=console
rootLogger.appenderRef.console.ref=STDOUT
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
Expand Down Expand Up @@ -199,14 +200,26 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig);
// Restart broker
broker.startup();
// Wait for broker to boot up
Thread.sleep(5000);
// Check whether the topic config is updated
topicDescription = adminClient.describeTopics(Collections.singleton(TOPIC)).values().get(TOPIC).get();
assertEquals(2, topicDescription.partitions().size());
try (CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig)) {
// Restart broker
broker.startup();
// Check whether the topic config is updated
long startTime = System.currentTimeMillis();
boolean isTopicConfigChanged = false;
while (!isTopicConfigChanged) {
if (System.currentTimeMillis() > startTime + 60000) {
fail("Topic config was not updated");
}

TopicDescription description = adminClient.describeTopics(Collections.singleton(TOPIC)).topicNameValues().get(TOPIC).get();
isTopicConfigChanged = 2 == description.partitions().size();

try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
*/
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;

public class CCAbstractKRaftTestHarness {
protected CCEmbeddedKRaftController _controller;

/**
* Setup the unit test.
*/
public void setUp() {
if (_controller == null) {
_controller = new CCEmbeddedKRaftController();
}
_controller.startup();
}

/**
* Teardown the unit test.
*/
public void tearDown() {
if (_controller != null) {
CCKafkaTestUtils.quietly(() -> _controller.close());
_controller = null;
}
}

protected CCEmbeddedKRaftController kraftController() {
return _controller;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,49 @@
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG;

public class CCEmbeddedBroker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
private final Map<SecurityProtocol, Integer> _ports;
private final Map<SecurityProtocol, String> _hosts;
private final KafkaServer _kafkaServer;
private final CCKafkaRaftServer _kafkaServer;
private int _id;
private File _logDir;
private final List<File> _logDirs;
private File _metadataLogDir;

public CCEmbeddedBroker(Map<Object, Object> config) {
_ports = new HashMap<>();
_hosts = new HashMap<>();
_logDirs = new ArrayList<>();

try {
// Also validates the config
KafkaConfig kafkaConfig = new KafkaConfig(config, true);
parseConfigs(config);

_kafkaServer = createKafkaServer(kafkaConfig);
_kafkaServer = new CCKafkaRaftServer(kafkaConfig, config.get(CLUSTER_ID_CONFIG).toString(), Time.SYSTEM);

startup();
_ports.replaceAll((securityProtocol, port) -> {
try {
return _kafkaServer.boundPort(ListenerName.forSecurityProtocol(securityProtocol));
return _kafkaServer.boundBrokerPort(ListenerName.forSecurityProtocol(securityProtocol));
} catch (Exception e) {
throw new IllegalStateException(e);
}
Expand All @@ -59,46 +57,10 @@ public CCEmbeddedBroker(Map<Object, Object> config) {
}
}

/**
* Creates the {@link KafkaServer} instance using the appropriate constructor for the version of Kafka on the classpath.
* It will attempt to use the 2.8+ version first and then fall back to the 2.5+ version. If neither work, a
* {@link NoSuchElementException} will be thrown.
*
* @param kafkaConfig The {@link KafkaConfig} instance to be used to create the returned {@link KafkaServer} instance.
* @return A {@link KafkaServer} instance configured with the supplied {@link KafkaConfig}.
* @throws ClassNotFoundException If a version of {@link KafkaServer} cannot be found on the classpath.
*/
private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws ClassNotFoundException {
// The KafkaServer constructor changed in 2.8, so we need to figure out which one we are using and invoke it with the correct parameters
KafkaServer kafkaServer = null;
Class<?> kafkaServerClass = Class.forName(KafkaServer.class.getName());

try {
Constructor<?> kafka28PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, boolean.class);
kafkaServer = (KafkaServer) kafka28PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), false);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
LOG.debug("Unable to find Kafka 2.8+ constructor for KafkaSever class", e);
}

if (kafkaServer == null) {
try {
Constructor<?> kafka25PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, Seq.class);
kafkaServer = (KafkaServer) kafka25PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), new ArrayBuffer<KafkaMetricsReporter>());
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
LOG.debug("Unable to find Kafka 2.5+ constructor for KafkaSever class", e);
}
}

if (kafkaServer != null) {
return kafkaServer;
} else {
throw new NoSuchElementException("Unable to find viable constructor fo the KafkaServer class");
}
}

private void parseConfigs(Map<Object, Object> config) {
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));
readLogDirs(config);
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));

// Bind addresses
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
Expand All @@ -115,6 +77,14 @@ private void parseConfigs(Map<Object, Object> config) {
}
}

private void readLogDirs(Map<Object, Object> config) {
String logdir = (String) config.get(ServerLogConfigs.LOG_DIR_CONFIG);
String[] paths = logdir.split(",");
for (String path : paths) {
_logDirs.add(new File(path));
}
}

public int id() {
return _id;
}
Expand Down Expand Up @@ -154,10 +124,9 @@ public void awaitShutdown() {
public void close() {
CCKafkaTestUtils.quietly(this::shutdown);
CCKafkaTestUtils.quietly(this::awaitShutdown);
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_logDir));
}

public static CCEmbeddedBrokerBuilder newServer() {
return new CCEmbeddedBrokerBuilder();
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_metadataLogDir));
for (File logDir : _logDirs) {
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(logDir));
}
}
}
Loading
Loading