Skip to content

Commit ab32a19

Browse files
committed
address comment
Signed-off-by: PoAn Yang <[email protected]>
1 parent 4afd9c0 commit ab32a19

File tree

2 files changed

+20
-43
lines changed

2 files changed

+20
-43
lines changed

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+4-43
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,6 @@ object DynamicListenerConfig {
881881
*/
882882
val ReconfigurableConfigs = Set(
883883
// Listener configs
884-
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
885884
SocketServerConfigs.LISTENERS_CONFIG,
886885
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
887886

@@ -967,37 +966,13 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
967966
DynamicListenerConfig.ReconfigurableConfigs
968967
}
969968

970-
private def listenerRegistrationsAltered(
971-
oldAdvertisedListeners: Map[ListenerName, EndPoint],
972-
newAdvertisedListeners: Map[ListenerName, EndPoint]
973-
): Boolean = {
974-
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
975-
oldAdvertisedListeners.foreachEntry {
976-
case (oldListenerName, oldEndpoint) =>
977-
newAdvertisedListeners.get(oldListenerName) match {
978-
case None => return true
979-
case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
980-
return true
981-
}
982-
}
983-
}
984-
false
985-
}
986-
987-
private def verifyListenerRegistrationAlterationSupported(): Unit = {
988-
if (!server.config.requiresZookeeper) {
989-
throw new ConfigException("Advertised listeners cannot be altered when using a " +
990-
"Raft-based metadata quorum.")
991-
}
992-
}
993-
994969
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
995970
val oldConfig = server.config
996971
val newListeners = listenersToMap(newConfig.listeners)
997-
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedBrokerListeners)
972+
val oldAdvertisedListeners = listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners)
998973
val oldListeners = listenersToMap(oldConfig.listeners)
999-
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
1000-
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
974+
if (!oldAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
975+
throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
1001976
if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
1002977
throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
1003978
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
@@ -1013,15 +988,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
1013988
if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
1014989
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
1015990
}
1016-
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
991+
if (!oldAdvertisedListeners.contains(newConfig.interBrokerListenerName))
1017992
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
1018-
1019-
// Currently, we do not support adding or removing listeners when in KRaft mode.
1020-
// However, we support changing other listener configurations (max connections, etc.)
1021-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1022-
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
1023-
verifyListenerRegistrationAlterationSupported()
1024-
}
1025993
}
1026994

1027995
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
@@ -1036,13 +1004,6 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
10361004
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
10371005
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
10381006
}
1039-
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedBrokerListeners),
1040-
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
1041-
verifyListenerRegistrationAlterationSupported()
1042-
server match {
1043-
case _ => throw new RuntimeException("Unable to handle reconfigure")
1044-
}
1045-
}
10461007
}
10471008

10481009
private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

+16
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
3030
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
3131
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
3232
import org.apache.kafka.common.network.ListenerName
33+
import org.apache.kafka.common.security.auth.SecurityProtocol
3334
import org.apache.kafka.raft.QuorumConfig
3435
import org.apache.kafka.network.SocketServerConfigs
3536
import org.apache.kafka.server.authorizer._
@@ -1020,6 +1021,21 @@ class DynamicBrokerConfigTest {
10201021
assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
10211022
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
10221023
}
1024+
1025+
@Test
1026+
def testAdvertisedListenersIsNotDynamicallyReconfigurable(): Unit = {
1027+
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
1028+
val ctx = new DynamicLogConfigContext(origProps)
1029+
1030+
// update advertised listeners should not work
1031+
val props = new Properties()
1032+
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "SASL_PLAINTEXT://localhost:8181")
1033+
ctx.config.dynamicConfig.updateDefaultConfig(props)
1034+
ctx.config.effectiveAdvertisedBrokerListeners.foreach(e =>
1035+
assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value)
1036+
)
1037+
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG))
1038+
}
10231039
}
10241040

10251041
class TestDynamicThreadPool extends BrokerReconfigurable {

0 commit comments

Comments
 (0)