Skip to content

Commit 398b4c4

Browse files
committed
KAFKA-17868: Do not ignore --feature flag in kafka-storage.sh (#17597)
Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan <[email protected]>
1 parent c821449 commit 398b4c4

File tree

2 files changed

+74
-0
lines changed

2 files changed

+74
-0
lines changed

core/src/main/scala/kafka/tools/StorageTool.scala

+28
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ object StorageTool extends Logging {
124124
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).
125125
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
126126
}
127+
Option(namespace.getList[String]("feature")).foreach(
128+
featureNamesAndLevels(_).foreach {
129+
kv => formatter.setFeatureLevel(kv._1, kv._2)
130+
})
127131
Option(namespace.getString("initial_controllers")).
128132
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
129133
if (namespace.getBoolean("standalone")) {
@@ -311,4 +315,28 @@ object StorageTool extends Logging {
311315
}
312316
}
313317
}
318+
319+
def parseNameAndLevel(input: String): (String, java.lang.Short) = {
320+
val equalsIndex = input.indexOf("=")
321+
if (equalsIndex < 0)
322+
throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.")
323+
val name = input.substring(0, equalsIndex).trim
324+
val levelString = input.substring(equalsIndex + 1).trim
325+
try {
326+
(name, levelString.toShort)
327+
} catch {
328+
case _: Throwable =>
329+
throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.")
330+
}
331+
}
332+
333+
def featureNamesAndLevels(features: java.util.List[String]): Map[String, java.lang.Short] = {
334+
val scalaFeatures = new mutable.ArrayBuffer[String]
335+
features.forEach(scalaFeatures += _)
336+
scalaFeatures.map { (feature: String) =>
337+
// Ensure the feature exists
338+
val nameAndLevel = parseNameAndLevel(feature)
339+
(nameAndLevel._1, nameAndLevel._2)
340+
}.toMap
341+
}
314342
}

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

+46
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,31 @@ Found problem:
329329
"Failed to find content in output: " + stream.toString())
330330
}
331331

332+
@Test
333+
def testFormatWithInvalidFeature(): Unit = {
334+
val availableDirs = Seq(TestUtils.tempDir())
335+
val properties = new Properties()
336+
properties.putAll(defaultStaticQuorumProperties)
337+
properties.setProperty("log.dirs", availableDirs.mkString(","))
338+
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
339+
"group.version, kraft.version, transaction.version",
340+
assertThrows(classOf[FormatterException], () =>
341+
runFormatCommand(new ByteArrayOutputStream(), properties,
342+
Seq("--feature", "non.existent.feature=20"))).getMessage)
343+
}
344+
345+
@Test
346+
def testFormatWithInvalidKRaftVersionLevel(): Unit = {
347+
val availableDirs = Seq(TestUtils.tempDir())
348+
val properties = new Properties()
349+
properties.putAll(defaultDynamicQuorumProperties)
350+
properties.setProperty("log.dirs", availableDirs.mkString(","))
351+
assertEquals("No feature:kraft.version with feature level 999",
352+
assertThrows(classOf[IllegalArgumentException], () =>
353+
runFormatCommand(new ByteArrayOutputStream(), properties,
354+
Seq("--feature", "kraft.version=999", "--standalone"))).getMessage)
355+
}
356+
332357
@Test
333358
def testFormatWithReleaseVersionAndKRaftVersion(): Unit = {
334359
val availableDirs = Seq(TestUtils.tempDir())
@@ -532,4 +557,25 @@ Found problem:
532557
"SCRAM is only supported in metadata.version 3.5-IV2 or later.",
533558
assertThrows(classOf[FormatterException], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage)
534559
}
560+
561+
@Test
562+
def testParseNameAndLevel(): Unit = {
563+
assertEquals(("foo.bar", 56.toShort), StorageTool.parseNameAndLevel("foo.bar=56"))
564+
}
565+
566+
@Test
567+
def testParseNameAndLevelWithNoEquals(): Unit = {
568+
assertEquals("Can't parse feature=level string kraft.version5: equals sign not found.",
569+
assertThrows(classOf[RuntimeException],
570+
() => StorageTool.parseNameAndLevel("kraft.version5")).
571+
getMessage)
572+
}
573+
574+
@Test
575+
def testParseNameAndLevelWithNoNumber(): Unit = {
576+
assertEquals("Can't parse feature=level string kraft.version=foo: unable to parse foo as a short.",
577+
assertThrows(classOf[RuntimeException],
578+
() => StorageTool.parseNameAndLevel("kraft.version=foo")).
579+
getMessage)
580+
}
535581
}

0 commit comments

Comments
 (0)