Skip to content

Commit

Permalink
KAFKA-18576: Convert ConfigType to Enum
Browse files Browse the repository at this point in the history
JIRA: KAFKA-18576
After removing ZooKeeper, we no longer need to exclude `client_metrics`
and `group` from `ConfigType#ALL`.

Since it's a common pattern to provide a mechanism to know all values in
enumeration ( Java enum provides ootb), we should convert ConfigType to
enum.
  • Loading branch information
frankvicky committed Feb 12, 2025
1 parent 400363b commit abfab57
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 104 deletions.
134 changes: 71 additions & 63 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ object ConfigCommand extends Logging {
val BrokerLoggerConfigType = "broker-loggers"
private val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+ BrokerLoggerConfigType
private val DefaultScramIterations = 4096
private val TopicType = ConfigType.TOPIC.value
private val ClientMetricsType = ConfigType.CLIENT_METRICS.value
private val BrokerType = ConfigType.BROKER.value
private val GroupType = ConfigType.GROUP.value
private val UserType = ConfigType.USER.value
private val ClientType = ConfigType.CLIENT.value
private val IpType = ConfigType.IP.value

def main(args: Array[String]): Unit = {
try {
Expand Down Expand Up @@ -171,12 +178,13 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)

entityTypeHead match {
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | ConfigType.GROUP =>
case TopicType | ClientMetricsType | BrokerType | GroupType =>
val configResourceType = entityTypeHead match {
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
case ConfigType.BROKER => ConfigResource.Type.BROKER
case ConfigType.GROUP => ConfigResource.Type.GROUP
case TopicType => ConfigResource.Type.TOPIC
case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
case BrokerType => ConfigResource.Type.BROKER
case GroupType => ConfigResource.Type.GROUP
case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
}
try {
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
Expand Down Expand Up @@ -205,29 +213,29 @@ object ConfigCommand extends Logging {
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.USER | ConfigType.CLIENT =>
case UserType | ClientType =>
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism.isScram(entry._1))
val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism.isScram)
val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
if (entityTypeHead == ClientType || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
// either just a client or both a user and a client
if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
throw new IllegalArgumentException(s"Only quota configs can be added for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
throw new IllegalArgumentException(s"Only quota configs can be deleted for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
} else { // ConfigType.User
if (unknownConfigsToAdd.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '${ConfigType.USER}' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
if (unknownConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '${ConfigType.USER}' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.")
if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '${ConfigType.USER}' using --bootstrap-server.")
throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '$UserType' using --bootstrap-server.")
}
}

Expand All @@ -241,10 +249,10 @@ object ConfigCommand extends Logging {
alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
}

case ConfigType.IP =>
case IpType =>
val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
throw new IllegalArgumentException(s"Only connection quota configs can be added for '$IpType' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)

case _ =>
Expand Down Expand Up @@ -290,9 +298,9 @@ object ConfigCommand extends Logging {
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val alterEntityTypes = entityTypes.map {
case ConfigType.USER => ClientQuotaEntity.USER
case ConfigType.CLIENT => ClientQuotaEntity.CLIENT_ID
case ConfigType.IP => ClientQuotaEntity.IP
case UserType => ClientQuotaEntity.USER
case ClientType => ClientQuotaEntity.CLIENT_ID
case IpType => ClientQuotaEntity.IP
case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType")
}
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
Expand Down Expand Up @@ -321,11 +329,11 @@ object ConfigCommand extends Logging {
val describeAll = opts.options.has(opts.allOpt)

entityTypes.head match {
case ConfigType.TOPIC | ConfigType.BROKER | BrokerLoggerConfigType | ConfigType.CLIENT_METRICS | ConfigType.GROUP =>
case TopicType | BrokerType | BrokerLoggerConfigType | ClientMetricsType | GroupType =>
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
case ConfigType.USER | ConfigType.CLIENT =>
case UserType | ClientType =>
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
case ConfigType.IP =>
case IpType =>
describeQuotaConfigs(adminClient, entityTypes, entityNames)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
Expand All @@ -335,13 +343,13 @@ object ConfigCommand extends Logging {
val entities = entityName
.map(name => List(name))
.getOrElse(entityType match {
case ConfigType.TOPIC =>
case TopicType =>
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.BROKER | BrokerLoggerConfigType =>
case BrokerType | BrokerLoggerConfigType =>
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
case ConfigType.CLIENT_METRICS =>
case ClientMetricsType =>
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
case ConfigType.GROUP =>
case GroupType =>
adminClient.listConsumerGroups().all.get.asScala.map(_.groupId).toSeq
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})
Expand Down Expand Up @@ -385,11 +393,11 @@ object ConfigCommand extends Logging {
}

val (configResourceType, dynamicConfigSource) = entityType match {
case ConfigType.TOPIC =>
case TopicType =>
if (entityName.nonEmpty)
Topic.validate(entityName)
(ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
case ConfigType.BROKER => entityName match {
case BrokerType => entityName match {
case BrokerDefaultEntityName =>
(ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
case _ =>
Expand All @@ -400,9 +408,9 @@ object ConfigCommand extends Logging {
if (entityName.nonEmpty)
validateBrokerId()
(ConfigResource.Type.BROKER_LOGGER, None)
case ConfigType.CLIENT_METRICS =>
case ClientMetricsType =>
(ConfigResource.Type.CLIENT_METRICS, Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))
case ConfigType.GROUP =>
case GroupType =>
(ConfigResource.Type.GROUP, Some(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG))
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
Expand Down Expand Up @@ -451,7 +459,7 @@ object ConfigCommand extends Logging {
describeQuotaConfigs(adminClient, entityTypes, entityNames)
// we describe user SCRAM credentials only when we are not describing client information
// and we are not given either --entity-default or --user-defaults
if (!entityTypes.contains(ConfigType.CLIENT) && !entityNames.contains("")) {
if (!entityTypes.contains(ClientType) && !entityNames.contains("")) {
val result = adminClient.describeUserScramCredentials(entityNames.asJava)
result.users.get(30, TimeUnit.SECONDS).asScala.foreach(user => {
try {
Expand All @@ -474,9 +482,9 @@ object ConfigCommand extends Logging {
private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
val entityType = entityTypeOpt match {
case Some(ConfigType.USER) => ClientQuotaEntity.USER
case Some(ConfigType.CLIENT) => ClientQuotaEntity.CLIENT_ID
case Some(ConfigType.IP) => ClientQuotaEntity.IP
case Some(UserType) => ClientQuotaEntity.USER
case Some(ClientType) => ClientQuotaEntity.CLIENT_ID
case Some(IpType) => ClientQuotaEntity.IP
case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
case None => throw new IllegalArgumentException("More entity names specified than entity types")
}
Expand Down Expand Up @@ -519,14 +527,14 @@ object ConfigCommand extends Logging {

private val nl: String = System.lineSeparator()
val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity-type '" + ConfigType.TOPIC + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.BROKER + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.USER + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.CLIENT + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.IP + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.CLIENT_METRICS + "': " + DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.GROUP + "': " + DynamicConfig.Group.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.USER}' and '${ConfigType.CLIENT}' may be specified together to update config for clients of a specific user.")
"For entity-type '" + TopicType + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + BrokerType + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + UserType + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientType + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + IpType + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientMetricsType + "': " + DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + GroupType + "': " + DynamicConfig.Group.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
val addConfigFile: OptionSpec[String] = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
Expand Down Expand Up @@ -567,19 +575,19 @@ object ConfigCommand extends Logging {
.ofType(classOf[String])
options = parser.parse(args : _*)

private val entityFlags = List((topic, ConfigType.TOPIC),
(client, ConfigType.CLIENT),
(user, ConfigType.USER),
(broker, ConfigType.BROKER),
private val entityFlags = List((topic, TopicType),
(client, ClientType),
(user, UserType),
(broker, BrokerType),
(brokerLogger, BrokerLoggerConfigType),
(ip, ConfigType.IP),
(clientMetrics, ConfigType.CLIENT_METRICS),
(group, ConfigType.GROUP))
(ip, IpType),
(clientMetrics, ClientMetricsType),
(group, GroupType))

private val entityDefaultsFlags = List((clientDefaults, ConfigType.CLIENT),
(userDefaults, ConfigType.USER),
(brokerDefaults, ConfigType.BROKER),
(ipDefaults, ConfigType.IP))
private val entityDefaultsFlags = List((clientDefaults, ClientType),
(userDefaults, UserType),
(brokerDefaults, BrokerType),
(ipDefaults, IpType))

private[admin] def entityTypes: List[String] = {
options.valuesOf(entityType).asScala.toList ++
Expand Down Expand Up @@ -625,8 +633,8 @@ object ConfigCommand extends Logging {
)
if (entityTypeVals.isEmpty)
throw new IllegalArgumentException("At least one entity type must be specified")
else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.USER, ConfigType.CLIENT)))
throw new IllegalArgumentException(s"Only '${ConfigType.USER}' and '${ConfigType.CLIENT}' entity types may be specified together")
else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(UserType, ClientType)))
throw new IllegalArgumentException(s"Only '$UserType' and '$ClientType' entity types may be specified together")

if ((options.has(entityName) || options.has(entityType) || options.has(entityDefault)) &&
(entityFlags ++ entityDefaultsFlags).exists(entity => options.has(entity._1)))
Expand All @@ -639,7 +647,7 @@ object ConfigCommand extends Logging {
(if (options.has(bootstrapControllerOpt)) 1 else 0)
if (numConnectOptions > 1)
throw new IllegalArgumentException("Only one of --bootstrap-server or --bootstrap-controller can be specified")
if (hasEntityName && (entityTypeVals.contains(ConfigType.BROKER) || entityTypeVals.contains(BrokerLoggerConfigType))) {
if (hasEntityName && (entityTypeVals.contains(BrokerType) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {
case _: NumberFormatException =>
Expand All @@ -648,18 +656,18 @@ object ConfigCommand extends Logging {
}
}

if (hasEntityName && entityTypeVals.contains(ConfigType.IP)) {
if (hasEntityName && entityTypeVals.contains(IpType)) {
Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
if (!isValidIpEntity(ipEntity))
throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity")
}
}

if (options.has(describeOpt)) {
if (!(entityTypeVals.contains(ConfigType.USER) ||
entityTypeVals.contains(ConfigType.CLIENT) ||
entityTypeVals.contains(ConfigType.BROKER) ||
entityTypeVals.contains(ConfigType.IP)) && options.has(entityDefault)) {
if (!(entityTypeVals.contains(UserType) ||
entityTypeVals.contains(ClientType) ||
entityTypeVals.contains(BrokerType) ||
entityTypeVals.contains(IpType)) && options.has(entityDefault)) {
throw new IllegalArgumentException(s"--entity-default must not be specified with --describe of ${entityTypeVals.mkString(",")}")
}

Expand All @@ -668,10 +676,10 @@ object ConfigCommand extends Logging {
}

if (options.has(alterOpt)) {
if (entityTypeVals.contains(ConfigType.USER) ||
entityTypeVals.contains(ConfigType.CLIENT) ||
entityTypeVals.contains(ConfigType.BROKER) ||
entityTypeVals.contains(ConfigType.IP)) {
if (entityTypeVals.contains(UserType) ||
entityTypeVals.contains(ClientType) ||
entityTypeVals.contains(BrokerType) ||
entityTypeVals.contains(IpType)) {
if (!hasEntityName && !hasEntityDefault)
throw new IllegalArgumentException("An entity-name or default entity must be specified with --alter of users, clients, brokers or ips")
} else if (!hasEntityName)
Expand Down
Loading

0 comments on commit abfab57

Please sign in to comment.