Skip to content

Commit

Permalink
KAFKA-17543: Enforce that broker.id.generation.enable is not used whe…
Browse files Browse the repository at this point in the history
…n migrating to KRaft (#17192)

Reviewers: Chia-Ping Tsai <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
cmccabe authored Sep 14, 2024
1 parent 6610a4d commit d7a456e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
}
if (brokerIdGenerationEnable) {
if (migrationEnabled) {
require(brokerId != -1, "broker id generation is incompatible with migration to ZK. Please disable it before enabling migration")
}
require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
} else {
require(brokerId >= 0, "broker.id must be greater than or equal to 0")
Expand Down Expand Up @@ -963,6 +966,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
} else {
// ZK-based
if (migrationEnabled) {
require(brokerId >= 0,
"broker broker.id.generation.enable is incompatible with migration to ZK. Please disable it before enabling migration")
validateQuorumVotersAndQuorumBootstrapServerForMigration()
require(controllerListenerNames.nonEmpty,
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not be empty when running in ZooKeeper migration mode: ${controllerListenerNames.asJava}")
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,6 @@ class KafkaServer(

/* generate brokerId */
config._brokerId = getOrGenerateBrokerId(initialMetaPropsEnsemble)
// Currently, we are migrating from ZooKeeper to KRaft. If broker.id.generation.enable is set to true,
// we must ensure that the nodeId synchronizes with the broker.id to prevent the nodeId from being -1,
// which would result in a failure during the migration.
config._nodeId = config.brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
this.logIdent = logContext.logPrefix
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,17 @@ class KafkaConfigTest {
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
}

@Test
def testMigrationCannotBeEnabledWithBrokerIdGeneration(): Unit = {
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort, logDirCount = 2)
props.setProperty(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "3000@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
assertEquals(
"requirement failed: broker id generation is incompatible with migration to ZK. Please disable it before enabling migration",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage)
}

@Test
def testMigrationEnabledKRaftMode(): Unit = {
val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,4 @@ class KafkaServerTest extends QuorumTestHarness {
val kafkaConfig = KafkaConfig.fromProps(props)
TestUtils.createServer(kafkaConfig)
}

}

0 comments on commit d7a456e

Please sign in to comment.