diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 8af29be4ba8da..29734bd8d8d20 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -134,6 +134,10 @@ object StorageTool extends Logging { case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)). foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString))) } + Option(namespace.getList[String]("feature")).foreach( + featureNamesAndLevels(_).foreachEntry { + (k, v) => formatter.setFeatureLevel(k, v) + }) Option(namespace.getString("initial_controllers")). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) if (namespace.getBoolean("standalone")) { @@ -466,4 +470,26 @@ object StorageTool extends Logging { } } } + + def parseNameAndLevel(input: String): (String, java.lang.Short) = { + val equalsIndex = input.indexOf("=") + if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") + val name = input.substring(0, equalsIndex).trim + val levelString = input.substring(equalsIndex + 1).trim + try { + (name, levelString.toShort) + } catch { + case _: Throwable => + throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") + } + } + + def featureNamesAndLevels(features: java.util.List[String]): Map[String, java.lang.Short] = { + features.asScala.map { (feature: String) => + // Ensure the feature exists + val nameAndLevel = parseNameAndLevel(feature) + (nameAndLevel._1, nameAndLevel._2) + }.toMap + } } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index e93cb84d002ac..beff77cf52377 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -325,10 +325,35 @@ Found problem: properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() assertEquals(0, runFormatCommand(stream, properties, Seq("--feature", "metadata.version=20"))) - assertTrue(stream.toString().contains("4.0-IV0"), + assertTrue(stream.toString().contains("3.8-IV0"), "Failed to find content in output: " + stream.toString()) } + @Test + def testFormatWithInvalidFeature(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + + "group.version, kraft.version, transaction.version", + assertThrows(classOf[FormatterException], () => + runFormatCommand(new ByteArrayOutputStream(), properties, + Seq("--feature", "non.existent.feature=20"))).getMessage) + } + + @Test + def testFormatWithInvalidKRaftVersionLevel(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + assertEquals("No feature:kraft.version with feature level 999", + assertThrows(classOf[IllegalArgumentException], () => + runFormatCommand(new ByteArrayOutputStream(), properties, + Seq("--feature", "kraft.version=999", "--standalone"))).getMessage) + } + @Test def testFormatWithReleaseVersionAndKRaftVersion(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) @@ -711,4 +736,25 @@ Found problem: "SCRAM is only supported in metadata.version 3.5-IV2 or later.", assertThrows(classOf[FormatterException], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage) } + + @Test + def testParseNameAndLevel(): Unit = { + assertEquals(("foo.bar", 56.toShort), StorageTool.parseNameAndLevel("foo.bar=56")) + } + + @Test + def testParseNameAndLevelWithNoEquals(): Unit = { + assertEquals("Can't parse feature=level string kraft.version5: equals sign not found.", + assertThrows(classOf[RuntimeException], + () => StorageTool.parseNameAndLevel("kraft.version5")). + getMessage) + } + + @Test + def testParseNameAndLevelWithNoNumber(): Unit = { + assertEquals("Can't parse feature=level string kraft.version=foo: unable to parse foo as a short.", + assertThrows(classOf[RuntimeException], + () => StorageTool.parseNameAndLevel("kraft.version=foo")). + getMessage) + } }