Skip to content

Commit 8b0ef93

Browse files
mingdaoychia7712
authored andcommitted
KAFKA-18499 Clean up zookeeper from LogConfig (#18583)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 10105f9 commit 8b0ef93

File tree

4 files changed

+21
-51
lines changed

4 files changed

+21
-51
lines changed

core/src/main/scala/kafka/server/ConfigHandler.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ trait ConfigHandler {
4343
}
4444

4545
/**
46-
* The TopicConfigHandler will process topic config changes from ZooKeeper or the metadata log.
46+
* The TopicConfigHandler will process topic config changes from the metadata log.
4747
* The callback provides the topic name and the full properties set.
4848
*/
4949
class TopicConfigHandler(private val replicaManager: ReplicaManager,

core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
118118
nullTopicConfigs.mkString(","))
119119
}
120120
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
121-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
121+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
122122
case BROKER => validateBrokerName(resource.name())
123123
case CLIENT_METRICS =>
124124
val properties = new Properties()

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

+15-30
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ class LogConfigTest {
293293
props.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs.toString)
294294
props.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes.toString)
295295
assertThrows(classOf[ConfigException],
296-
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
296+
() => LogConfig.validate(Collections.emptyMap(), props, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
297297
}
298298

299299
@Test
@@ -305,17 +305,17 @@ class LogConfigTest {
305305
val logProps = new Properties()
306306
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)
307307
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
308-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
308+
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
309309

310310
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
311311
assertThrows(classOf[ConfigException],
312-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
312+
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
313313
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,compact")
314314
assertThrows(classOf[ConfigException],
315-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
315+
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
316316
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
317317
assertThrows(classOf[ConfigException],
318-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
318+
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
319319
}
320320

321321
@ParameterizedTest(name = "testEnableRemoteLogStorage with sysRemoteStorageEnabled: {0}")
@@ -328,10 +328,10 @@ class LogConfigTest {
328328
val logProps = new Properties()
329329
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
330330
if (sysRemoteStorageEnabled) {
331-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
331+
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
332332
} else {
333333
val message = assertThrows(classOf[ConfigException],
334-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
334+
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
335335
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
336336
}
337337
}
@@ -348,7 +348,7 @@ class LogConfigTest {
348348
if (wasRemoteStorageEnabled) {
349349
val message = assertThrows(classOf[InvalidConfigurationException],
350350
() => LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
351-
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false))
351+
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
352352
assertTrue(message.getMessage.contains("It is invalid to disable remote storage without deleting remote data. " +
353353
"If you want to keep the remote data and turn to read only, please set `remote.storage.enable=true,remote.log.copy.disable=true`. " +
354354
"If you want to disable remote storage and delete all remote data, please set `remote.storage.enable=false,remote.log.delete.on.disable=true`."))
@@ -357,11 +357,11 @@ class LogConfigTest {
357357
// It should be able to disable the remote log storage when delete on disable is set to true
358358
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true")
359359
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
360-
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
360+
logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
361361
} else {
362-
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
362+
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
363363
LogConfig.validate(Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"), logProps,
364-
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
364+
kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
365365
}
366366
}
367367

@@ -381,11 +381,11 @@ class LogConfigTest {
381381
if (sysRemoteStorageEnabled) {
382382
val message = assertThrows(classOf[ConfigException],
383383
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
384-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
384+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
385385
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG))
386386
} else {
387387
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
388-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
388+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
389389
}
390390
}
391391

@@ -405,11 +405,11 @@ class LogConfigTest {
405405
if (sysRemoteStorageEnabled) {
406406
val message = assertThrows(classOf[ConfigException],
407407
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
408-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true))
408+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled()))
409409
assertTrue(message.getMessage.contains(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG))
410410
} else {
411411
LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap,
412-
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), true)
412+
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled())
413413
}
414414
}
415415

@@ -447,21 +447,6 @@ class LogConfigTest {
447447
LogConfig.validate(logProps)
448448
}
449449

450-
@ParameterizedTest
451-
@ValueSource(strings = Array(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG))
452-
def testInValidRemoteConfigsInZK(configKey: String): Unit = {
453-
val kafkaProps = TestUtils.createDummyBrokerConfig()
454-
kafkaProps.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
455-
val kafkaConfig = KafkaConfig.fromProps(kafkaProps)
456-
val logProps = new Properties
457-
logProps.put(configKey, "true")
458-
459-
val message = assertThrows(classOf[InvalidConfigurationException],
460-
() => LogConfig.validate(Collections.emptyMap(), logProps, kafkaConfig.extractLogConfigMap, true, true))
461-
assertTrue(message.getMessage.contains("It is invalid to set `remote.log.delete.on.disable` or " +
462-
"`remote.log.copy.disable` under Zookeeper's mode."))
463-
}
464-
465450
@Test
466451
def testValidateWithMetadataVersionJbodSupport(): Unit = {
467452
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =

storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -512,17 +512,12 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,
512512
* @param existingConfigs The existing properties
513513
* @param newConfigs The new properties to be validated
514514
* @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
515-
* @param fromZK true if this is a ZK cluster
516515
*/
517516
private static void validateTopicLogConfigValues(Map<String, String> existingConfigs,
518517
Map<?, ?> newConfigs,
519-
boolean isRemoteLogStorageSystemEnabled,
520-
boolean fromZK) {
518+
boolean isRemoteLogStorageSystemEnabled) {
521519
validateValues(newConfigs);
522520

523-
if (fromZK) {
524-
validateNoInvalidRemoteStorageConfigsInZK(newConfigs);
525-
}
526521
boolean isRemoteLogStorageEnabled = (Boolean) newConfigs.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
527522
if (isRemoteLogStorageEnabled) {
528523
validateRemoteStorageOnlyIfSystemEnabled(newConfigs, isRemoteLogStorageSystemEnabled, false);
@@ -564,15 +559,6 @@ public static void validateRetentionConfigsWhenRemoteCopyDisabled(Map<?, ?> newC
564559
}
565560
}
566561

567-
public static void validateNoInvalidRemoteStorageConfigsInZK(Map<?, ?> newConfigs) {
568-
boolean isRemoteLogDeleteOnDisable = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, false);
569-
boolean isRemoteLogCopyDisabled = (Boolean) Utils.castToStringObjectMap(newConfigs).getOrDefault(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, false);
570-
if (isRemoteLogDeleteOnDisable || isRemoteLogCopyDisabled) {
571-
throw new InvalidConfigurationException("It is invalid to set `remote.log.delete.on.disable` or " +
572-
"`remote.log.copy.disable` under Zookeeper's mode.");
573-
}
574-
}
575-
576562
public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) {
577563
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
578564
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
@@ -630,14 +616,13 @@ private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
630616
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
631617
*/
632618
public static void validate(Properties props) {
633-
validate(Collections.emptyMap(), props, Collections.emptyMap(), false, false);
619+
validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
634620
}
635621

636622
public static void validate(Map<String, String> existingConfigs,
637623
Properties props,
638624
Map<?, ?> configuredProps,
639-
boolean isRemoteLogStorageSystemEnabled,
640-
boolean fromZK) {
625+
boolean isRemoteLogStorageSystemEnabled) {
641626
validateNames(props);
642627
if (configuredProps == null || configuredProps.isEmpty()) {
643628
Map<?, ?> valueMaps = CONFIG.parse(props);
@@ -646,7 +631,7 @@ public static void validate(Map<String, String> existingConfigs,
646631
Map<Object, Object> combinedConfigs = new HashMap<>(configuredProps);
647632
combinedConfigs.putAll(props);
648633
Map<?, ?> valueMaps = CONFIG.parse(combinedConfigs);
649-
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled, fromZK);
634+
validateTopicLogConfigValues(existingConfigs, valueMaps, isRemoteLogStorageSystemEnabled);
650635
}
651636
}
652637

0 commit comments

Comments
 (0)