Skip to content

Commit 113d6ac

Browse files
FrankYang0529mjsax
authored andcommitted
KAFKA-18405 Remove ZooKeeper logic from DynamicBrokerConfig (apache#18508)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 135603b commit 113d6ac

File tree

7 files changed

+55
-103
lines changed

7 files changed

+55
-103
lines changed

Diff for: core/src/main/scala/kafka/server/BrokerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class BrokerServer(
193193
info("Starting broker")
194194

195195
val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
196-
config.dynamicConfig.initialize(zkClientOpt = None, Some(clientMetricsReceiverPlugin))
196+
config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
197197

198198
/* start scheduler */
199199
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

Diff for: core/src/main/scala/kafka/server/ControllerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class ControllerServer(
124124
try {
125125
this.logIdent = logContext.logPrefix()
126126
info("Starting controller")
127-
config.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
127+
config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
128128

129129
maybeChangeStatus(STARTING, STARTED)
130130

Diff for: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+9-69
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import kafka.log.{LogCleaner, LogManager}
2626
import kafka.network.{DataPlaneAcceptor, SocketServer}
2727
import kafka.server.DynamicBrokerConfig._
2828
import kafka.utils.{CoreUtils, Logging}
29-
import kafka.zk.{AdminZkClient, KafkaZkClient}
3029
import org.apache.kafka.common.Reconfigurable
3130
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
3231
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
@@ -39,7 +38,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3938
import org.apache.kafka.network.SocketServerConfigs
4039
import org.apache.kafka.security.PasswordEncoder
4140
import org.apache.kafka.server.ProcessRole
42-
import org.apache.kafka.server.config.{ConfigType, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
41+
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
4342
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4443
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
4544
import org.apache.kafka.server.telemetry.ClientTelemetry
@@ -58,8 +57,6 @@ import scala.jdk.CollectionConverters._
5857
* </ul>
5958
* The order of precedence for broker configs is:
6059
* <ol>
61-
* <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
62-
* <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/&lt;default&gt;</li>
6360
* <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
6461
* <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
6562
* </ol>
@@ -215,17 +212,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
215212
private var currentConfig: KafkaConfig = _
216213
private val dynamicConfigPasswordEncoder = Some(PasswordEncoder.NOOP)
217214

218-
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
215+
private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
219216
currentConfig = new KafkaConfig(kafkaConfig.props, false)
220217
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
221-
222-
zkClientOpt.foreach { zkClient =>
223-
val adminZkClient = new AdminZkClient(zkClient)
224-
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false)
225-
val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString)
226-
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
227-
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
228-
}
229218
}
230219

231220
/**
@@ -427,14 +416,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
427416
props
428417
}
429418

430-
// If the secret has changed, password.encoder.old.secret contains the old secret that was used
431-
// to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
432-
// encoded using the current secret. Ignore any errors during decoding since old secret may not
433-
// have been removed during broker restart.
434-
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
435-
persistentProps.clone().asInstanceOf[Properties]
436-
}
437-
438419
/**
439420
* Validate the provided configs `propsOverride` and return the full Kafka configs with
440421
* the configured defaults and these overrides.
@@ -900,7 +881,6 @@ object DynamicListenerConfig {
900881
*/
901882
val ReconfigurableConfigs = Set(
902883
// Listener configs
903-
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
904884
SocketServerConfigs.LISTENERS_CONFIG,
905885
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
906886

@@ -986,40 +966,16 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
986966
DynamicListenerConfig.ReconfigurableConfigs
987967
}
988968

989-
private def listenerRegistrationsAltered(
990-
oldAdvertisedListeners: Map[ListenerName, EndPoint],
991-
newAdvertisedListeners: Map[ListenerName, EndPoint]
992-
): Boolean = {
993-
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
994-
oldAdvertisedListeners.foreachEntry {
995-
case (oldListenerName, oldEndpoint) =>
996-
newAdvertisedListeners.get(oldListenerName) match {
997-
case None => return true
998-
case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
999-
return true
1000-
}
1001-
}
1002-
}
1003-
false
1004-
}
1005-
1006-
private def verifyListenerRegistrationAlterationSupported(): Unit = {
1007-
if (!server.config.requiresZookeeper) {
1008-
throw new ConfigException("Advertised listeners cannot be altered when using a " +
1009-
"Raft-based metadata quorum.")
1010-
}
1011-
}
1012-
1013969
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
1014970
val oldConfig = server.config
1015-
val newListeners = listenersToMap(newConfig.listeners)
1016-
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
1017-
val oldListeners = listenersToMap(oldConfig.listeners)
1018-
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
1019-
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
1020-
if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
971+
val newListeners = newConfig.listeners.map(_.listenerName).toSet
972+
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
973+
val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
974+
if (!oldAdvertisedListeners.subsetOf(newListeners))
975+
throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
976+
if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
1021977
throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
1022-
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
978+
newListeners.intersect(oldListeners).foreach { listenerName =>
1023979
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
1024980
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) =>
1025981
// skip the reconfigurable configs
@@ -1032,15 +988,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
1032988
if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
1033989
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
1034990
}
1035-
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
1036-
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
1037-
1038-
// Currently, we do not support adding or removing listeners when in KRaft mode.
1039-
// However, we support changing other listener configurations (max connections, etc.)
1040-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1041-
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
1042-
verifyListenerRegistrationAlterationSupported()
1043-
}
1044991
}
1045992

1046993
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
@@ -1055,13 +1002,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
10551002
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
10561003
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
10571004
}
1058-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1059-
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
1060-
verifyListenerRegistrationAlterationSupported()
1061-
server match {
1062-
case _ => throw new RuntimeException("Unable to handle reconfigure")
1063-
}
1064-
}
10651005
}
10661006

10671007
private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =

Diff for: core/src/main/scala/kafka/server/SharedServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class SharedServer(
268268
// This is only done in tests.
269269
metrics = new Metrics()
270270
}
271-
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, clientMetricsReceiverPluginOpt = None)
271+
sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
272272

273273
if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
274274
brokerMetrics = new BrokerServerMetrics(metrics)

Diff for: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

+1-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package kafka.server.metadata
1919

20-
import java.util.{OptionalInt, Properties}
20+
import java.util.OptionalInt
2121
import kafka.coordinator.transaction.TransactionCoordinator
2222
import kafka.log.LogManager
2323
import kafka.server.{KafkaConfig, ReplicaManager}
@@ -243,10 +243,6 @@ class BrokerMetadataPublisher(
243243
}
244244
}
245245

246-
def reloadUpdatedFilesWithoutConfigChange(props: Properties): Unit = {
247-
config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
248-
}
249-
250246
/**
251247
* Update the coordinator of local replica changes: election and resignation.
252248
*

0 commit comments

Comments
 (0)