Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18576: Convert ConfigType to Enum #18711

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 71 additions & 63 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ object ConfigCommand extends Logging {
val BrokerLoggerConfigType = "broker-loggers"
private val BrokerSupportedConfigTypes = ConfigType.ALL.asScala :+ BrokerLoggerConfigType
private val DefaultScramIterations = 4096
private val TopicType = ConfigType.TOPIC.value
private val ClientMetricsType = ConfigType.CLIENT_METRICS.value
private val BrokerType = ConfigType.BROKER.value
private val GroupType = ConfigType.GROUP.value
private val UserType = ConfigType.USER.value
private val ClientType = ConfigType.CLIENT.value
private val IpType = ConfigType.IP.value

def main(args: Array[String]): Unit = {
try {
Expand Down Expand Up @@ -171,12 +178,13 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)

entityTypeHead match {
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | ConfigType.GROUP =>
case TopicType | ClientMetricsType | BrokerType | GroupType =>
val configResourceType = entityTypeHead match {
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
case ConfigType.BROKER => ConfigResource.Type.BROKER
case ConfigType.GROUP => ConfigResource.Type.GROUP
case TopicType => ConfigResource.Type.TOPIC
case ClientMetricsType => ConfigResource.Type.CLIENT_METRICS
case BrokerType => ConfigResource.Type.BROKER
case GroupType => ConfigResource.Type.GROUP
case _ => throw new IllegalArgumentException(s"$entityNameHead is not a valid entity-type.")
}
try {
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
Expand Down Expand Up @@ -205,29 +213,29 @@ object ConfigCommand extends Logging {
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.USER | ConfigType.CLIENT =>
case UserType | ClientType =>
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToAddMap = configsToBeAdded.filter(entry => ScramMechanism.isScram(entry._1))
val unknownConfigsToAdd = configsToBeAdded.keys.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
val hasQuotaConfigsToDelete = configsToBeDeleted.exists(QuotaConfig.isClientOrUserQuotaConfig)
val scramConfigsToDelete = configsToBeDeleted.filter(ScramMechanism.isScram)
val unknownConfigsToDelete = configsToBeDeleted.filterNot(key => ScramMechanism.isScram(key) || QuotaConfig.isClientOrUserQuotaConfig(key))
if (entityTypeHead == ConfigType.CLIENT || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
if (entityTypeHead == ClientType || entityTypes.size == 2) { // size==2 for case where users is specified first on the command line, before clients
// either just a client or both a user and a client
if (unknownConfigsToAdd.nonEmpty || scramConfigsToAddMap.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be added for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
throw new IllegalArgumentException(s"Only quota configs can be added for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToAdd ++ scramConfigsToAddMap.keys}")
if (unknownConfigsToDelete.nonEmpty || scramConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota configs can be deleted for '${ConfigType.CLIENT}' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
throw new IllegalArgumentException(s"Only quota configs can be deleted for '$ClientType' using --bootstrap-server. Unexpected config names: ${unknownConfigsToDelete ++ scramConfigsToDelete}")
} else { // ConfigType.User
if (unknownConfigsToAdd.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '${ConfigType.USER}' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be added for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToAdd")
if (unknownConfigsToDelete.nonEmpty)
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '${ConfigType.USER}' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
throw new IllegalArgumentException(s"Only quota and SCRAM credential configs can be deleted for '$UserType' using --bootstrap-server. Unexpected config names: $unknownConfigsToDelete")
if (scramConfigsToAddMap.nonEmpty || scramConfigsToDelete.nonEmpty) {
if (entityNames.exists(_.isEmpty)) // either --entity-type users --entity-default or --user-defaults
throw new IllegalArgumentException("The use of --entity-default or --user-defaults is not allowed with User SCRAM Credentials using --bootstrap-server.")
if (hasQuotaConfigsToAdd || hasQuotaConfigsToDelete)
throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '${ConfigType.USER}' using --bootstrap-server.")
throw new IllegalArgumentException(s"Cannot alter both quota and SCRAM credential configs simultaneously for '$UserType' using --bootstrap-server.")
}
}

Expand All @@ -241,10 +249,10 @@ object ConfigCommand extends Logging {
alterUserScramCredentialConfigs(adminClient, entityNames.head, scramConfigsToAddMap, scramConfigsToDelete)
}

case ConfigType.IP =>
case IpType =>
val unknownConfigs = (configsToBeAdded.keys ++ configsToBeDeleted).filterNot(key => DynamicConfig.Ip.names.contains(key))
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
throw new IllegalArgumentException(s"Only connection quota configs can be added for '$IpType' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)

case _ =>
Expand Down Expand Up @@ -290,9 +298,9 @@ object ConfigCommand extends Logging {
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")

val alterEntityTypes = entityTypes.map {
case ConfigType.USER => ClientQuotaEntity.USER
case ConfigType.CLIENT => ClientQuotaEntity.CLIENT_ID
case ConfigType.IP => ClientQuotaEntity.IP
case UserType => ClientQuotaEntity.USER
case ClientType => ClientQuotaEntity.CLIENT_ID
case IpType => ClientQuotaEntity.IP
case entType => throw new IllegalArgumentException(s"Unexpected entity type: $entType")
}
val alterEntityNames = entityNames.map(en => if (en.nonEmpty) en else null)
Expand Down Expand Up @@ -321,11 +329,11 @@ object ConfigCommand extends Logging {
val describeAll = opts.options.has(opts.allOpt)

entityTypes.head match {
case ConfigType.TOPIC | ConfigType.BROKER | BrokerLoggerConfigType | ConfigType.CLIENT_METRICS | ConfigType.GROUP =>
case TopicType | BrokerType | BrokerLoggerConfigType | ClientMetricsType | GroupType =>
describeResourceConfig(adminClient, entityTypes.head, entityNames.headOption, describeAll)
case ConfigType.USER | ConfigType.CLIENT =>
case UserType | ClientType =>
describeClientQuotaAndUserScramCredentialConfigs(adminClient, entityTypes, entityNames)
case ConfigType.IP =>
case IpType =>
describeQuotaConfigs(adminClient, entityTypes, entityNames)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
Expand All @@ -335,13 +343,13 @@ object ConfigCommand extends Logging {
val entities = entityName
.map(name => List(name))
.getOrElse(entityType match {
case ConfigType.TOPIC =>
case TopicType =>
adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.BROKER | BrokerLoggerConfigType =>
case BrokerType | BrokerLoggerConfigType =>
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
case ConfigType.CLIENT_METRICS =>
case ClientMetricsType =>
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
case ConfigType.GROUP =>
case GroupType =>
adminClient.listConsumerGroups().all.get.asScala.map(_.groupId).toSeq
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})
Expand Down Expand Up @@ -385,11 +393,11 @@ object ConfigCommand extends Logging {
}

val (configResourceType, dynamicConfigSource) = entityType match {
case ConfigType.TOPIC =>
case TopicType =>
if (entityName.nonEmpty)
Topic.validate(entityName)
(ConfigResource.Type.TOPIC, Some(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG))
case ConfigType.BROKER => entityName match {
case BrokerType => entityName match {
case BrokerDefaultEntityName =>
(ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
case _ =>
Expand All @@ -400,9 +408,9 @@ object ConfigCommand extends Logging {
if (entityName.nonEmpty)
validateBrokerId()
(ConfigResource.Type.BROKER_LOGGER, None)
case ConfigType.CLIENT_METRICS =>
case ClientMetricsType =>
(ConfigResource.Type.CLIENT_METRICS, Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))
case ConfigType.GROUP =>
case GroupType =>
(ConfigResource.Type.GROUP, Some(ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG))
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
Expand Down Expand Up @@ -451,7 +459,7 @@ object ConfigCommand extends Logging {
describeQuotaConfigs(adminClient, entityTypes, entityNames)
// we describe user SCRAM credentials only when we are not describing client information
// and we are not given either --entity-default or --user-defaults
if (!entityTypes.contains(ConfigType.CLIENT) && !entityNames.contains("")) {
if (!entityTypes.contains(ClientType) && !entityNames.contains("")) {
val result = adminClient.describeUserScramCredentials(entityNames.asJava)
result.users.get(30, TimeUnit.SECONDS).asScala.foreach(user => {
try {
Expand All @@ -474,9 +482,9 @@ object ConfigCommand extends Logging {
private def getAllClientQuotasConfigs(adminClient: Admin, entityTypes: List[String], entityNames: List[String]) = {
val components = entityTypes.map(Some(_)).zipAll(entityNames.map(Some(_)), None, None).map { case (entityTypeOpt, entityNameOpt) =>
val entityType = entityTypeOpt match {
case Some(ConfigType.USER) => ClientQuotaEntity.USER
case Some(ConfigType.CLIENT) => ClientQuotaEntity.CLIENT_ID
case Some(ConfigType.IP) => ClientQuotaEntity.IP
case Some(UserType) => ClientQuotaEntity.USER
case Some(ClientType) => ClientQuotaEntity.CLIENT_ID
case Some(IpType) => ClientQuotaEntity.IP
case Some(_) => throw new IllegalArgumentException(s"Unexpected entity type ${entityTypeOpt.get}")
case None => throw new IllegalArgumentException("More entity names specified than entity types")
}
Expand Down Expand Up @@ -519,14 +527,14 @@ object ConfigCommand extends Logging {

private val nl: String = System.lineSeparator()
val addConfig: OptionSpec[String] = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " +
"For entity-type '" + ConfigType.TOPIC + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.BROKER + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.USER + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.CLIENT + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.IP + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.CLIENT_METRICS + "': " + DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ConfigType.GROUP + "': " + DynamicConfig.Group.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.USER}' and '${ConfigType.CLIENT}' may be specified together to update config for clients of a specific user.")
"For entity-type '" + TopicType + "': " + LogConfig.configNames.asScala.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + BrokerType + "': " + DynamicConfig.Broker.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + UserType + "': " + DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientType + "': " + DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + IpType + "': " + DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + ClientMetricsType + "': " + DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
"For entity-type '" + GroupType + "': " + DynamicConfig.Group.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
s"Entity types '$UserType' and '$ClientType' may be specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
val addConfigFile: OptionSpec[String] = parser.accepts("add-config-file", "Path to a properties file with configs to add. See add-config for a list of valid configurations.")
Expand Down Expand Up @@ -567,19 +575,19 @@ object ConfigCommand extends Logging {
.ofType(classOf[String])
options = parser.parse(args : _*)

private val entityFlags = List((topic, ConfigType.TOPIC),
(client, ConfigType.CLIENT),
(user, ConfigType.USER),
(broker, ConfigType.BROKER),
private val entityFlags = List((topic, TopicType),
(client, ClientType),
(user, UserType),
(broker, BrokerType),
(brokerLogger, BrokerLoggerConfigType),
(ip, ConfigType.IP),
(clientMetrics, ConfigType.CLIENT_METRICS),
(group, ConfigType.GROUP))
(ip, IpType),
(clientMetrics, ClientMetricsType),
(group, GroupType))

private val entityDefaultsFlags = List((clientDefaults, ConfigType.CLIENT),
(userDefaults, ConfigType.USER),
(brokerDefaults, ConfigType.BROKER),
(ipDefaults, ConfigType.IP))
private val entityDefaultsFlags = List((clientDefaults, ClientType),
(userDefaults, UserType),
(brokerDefaults, BrokerType),
(ipDefaults, IpType))

private[admin] def entityTypes: List[String] = {
options.valuesOf(entityType).asScala.toList ++
Expand Down Expand Up @@ -625,8 +633,8 @@ object ConfigCommand extends Logging {
)
if (entityTypeVals.isEmpty)
throw new IllegalArgumentException("At least one entity type must be specified")
else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.USER, ConfigType.CLIENT)))
throw new IllegalArgumentException(s"Only '${ConfigType.USER}' and '${ConfigType.CLIENT}' entity types may be specified together")
else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(UserType, ClientType)))
throw new IllegalArgumentException(s"Only '$UserType' and '$ClientType' entity types may be specified together")

if ((options.has(entityName) || options.has(entityType) || options.has(entityDefault)) &&
(entityFlags ++ entityDefaultsFlags).exists(entity => options.has(entity._1)))
Expand All @@ -639,7 +647,7 @@ object ConfigCommand extends Logging {
(if (options.has(bootstrapControllerOpt)) 1 else 0)
if (numConnectOptions > 1)
throw new IllegalArgumentException("Only one of --bootstrap-server or --bootstrap-controller can be specified")
if (hasEntityName && (entityTypeVals.contains(ConfigType.BROKER) || entityTypeVals.contains(BrokerLoggerConfigType))) {
if (hasEntityName && (entityTypeVals.contains(BrokerType) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {
case _: NumberFormatException =>
Expand All @@ -648,18 +656,18 @@ object ConfigCommand extends Logging {
}
}

if (hasEntityName && entityTypeVals.contains(ConfigType.IP)) {
if (hasEntityName && entityTypeVals.contains(IpType)) {
Seq(entityName, ip).filter(options.has(_)).map(options.valueOf(_)).foreach { ipEntity =>
if (!isValidIpEntity(ipEntity))
throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid IP or resolvable host, but it is: $ipEntity")
}
}

if (options.has(describeOpt)) {
if (!(entityTypeVals.contains(ConfigType.USER) ||
entityTypeVals.contains(ConfigType.CLIENT) ||
entityTypeVals.contains(ConfigType.BROKER) ||
entityTypeVals.contains(ConfigType.IP)) && options.has(entityDefault)) {
if (!(entityTypeVals.contains(UserType) ||
entityTypeVals.contains(ClientType) ||
entityTypeVals.contains(BrokerType) ||
entityTypeVals.contains(IpType)) && options.has(entityDefault)) {
throw new IllegalArgumentException(s"--entity-default must not be specified with --describe of ${entityTypeVals.mkString(",")}")
}

Expand All @@ -668,10 +676,10 @@ object ConfigCommand extends Logging {
}

if (options.has(alterOpt)) {
if (entityTypeVals.contains(ConfigType.USER) ||
entityTypeVals.contains(ConfigType.CLIENT) ||
entityTypeVals.contains(ConfigType.BROKER) ||
entityTypeVals.contains(ConfigType.IP)) {
if (entityTypeVals.contains(UserType) ||
entityTypeVals.contains(ClientType) ||
entityTypeVals.contains(BrokerType) ||
entityTypeVals.contains(IpType)) {
if (!hasEntityName && !hasEntityDefault)
throw new IllegalArgumentException("An entity-name or default entity must be specified with --alter of users, clients, brokers or ips")
} else if (!hasEntityName)
Expand Down
Loading