Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ project(':server') {
compileOnly libs.bndlib
compileOnly libs.spotbugs

implementation libs.commonsValidator

implementation project(':clients')
implementation project(':metadata')
implementation project(':server-common')
Expand Down Expand Up @@ -1016,7 +1018,6 @@ project(':core') {
implementation project(':share-coordinator')

implementation libs.argparse4j
implementation libs.commonsValidator
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
<allow pkg="org.apache.kafka.server.telemetry" />
</subpackage>
<subpackage name="config">
<allow pkg="org.apache.commons"/>
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" />
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.server
import java.util
import java.util.concurrent.TimeUnit
import java.util.Properties
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, TopicConfig}
Expand Down Expand Up @@ -435,7 +435,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}

def listeners: Seq[Endpoint] =
CoreUtils.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG), effectiveListenerSecurityProtocolMap)
AbstractKafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG), effectiveListenerSecurityProtocolMap).asScala

def controllerListeners: Seq[Endpoint] =
listeners.filter(l => controllerListenerNames.contains(l.listener))
Expand All @@ -452,7 +452,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
val advertisedListenersProp = getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, false).asScala
.filter(l => controllerListenerNames.contains(l.listener))
} else {
Seq.empty
Expand Down Expand Up @@ -482,7 +482,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
// Use advertised listeners if defined, fallback to listeners otherwise
val advertisedListenersProp = getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val advertisedListeners = if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp, effectiveListenerSecurityProtocolMap, false).asScala
} else {
listeners
}
Expand Down
82 changes: 0 additions & 82 deletions core/src/main/scala/kafka/utils/CoreUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,9 @@ import java.lang.management.ManagementFactory
import com.typesafe.scalalogging.Logger

import javax.management.ObjectName
import scala.collection.Seq
import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.network.SocketServerConfigs
import org.slf4j.event.Level

import scala.jdk.CollectionConverters._

/**
* General helper functions!
*
Expand All @@ -48,8 +40,6 @@ import scala.jdk.CollectionConverters._
object CoreUtils {
private val logger = Logger(getClass)

private val inetAddressValidator = InetAddressValidator.getInstance()

/**
* Do the given action and log any exceptions thrown without rethrowing them.
*
Expand Down Expand Up @@ -121,76 +111,4 @@ object CoreUtils {

def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)

def listenerListToEndPoints(listeners: java.util.List[String], securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true)
}

private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: java.util.List[String]): Unit = {
val distinctPorts = endpoints.map(_.port).distinct
require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners")
}

def listenerListToEndPoints(listeners: java.util.List[String], securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): Seq[Endpoint] = {
def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
(inetAddressValidator.isValidInet4Address(first) && inetAddressValidator.isValidInet6Address(second)) ||
(inetAddressValidator.isValidInet6Address(first) && inetAddressValidator.isValidInet4Address(second))

def validate(endPoints: Seq[Endpoint]): Unit = {
val distinctListenerNames = endPoints.map(_.listener).distinct
require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners")

val (duplicatePorts, _) = endPoints.filter {
// filter port 0 for unit tests
ep => ep.port != 0
}.groupBy(_.port).partition {
case (_, endpoints) => endpoints.size > 1
}

// Exception case, let's allow duplicate ports if one host is on IPv4 and the other one is on IPv6
val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
case (port, eps) =>
(port, eps.partition(ep =>
ep.host != null && inetAddressValidator.isValid(ep.host)
))
}

// Iterate through every grouping of duplicates by port to see if they are valid
duplicatePortsPartitionedByValidIps.foreach {
case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
if (requireDistinctPorts)
checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

duplicatesWithIpHosts match {
case eps if eps.isEmpty =>
case Seq(ep1, ep2) =>
if (requireDistinctPorts) {
val errorMessage = "If you have two listeners on " +
s"the same port then one needs to be IPv4 and the other IPv6, listeners: $listeners, port: $port"
require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), errorMessage)

// If we reach this point it means that even though duplicatesWithIpHosts in isolation can be valid, if
// there happens to be ANOTHER listener on this port without an IP host (such as a null host) then its
// not valid.
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException(errorMessage)
}
case _ =>
// Having more than 2 duplicate endpoints doesn't make sense since we only have 2 IP stacks (one is IPv4
// and the other is IPv6)
if (requireDistinctPorts)
throw new IllegalArgumentException("Each listener must have a different port unless exactly one listener has " +
s"an IPv4 address and the other IPv6 address, listeners: $listeners, port: $port")
}
}
}

val endPoints = try {
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap).asScala
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e)
}
validate(endPoints)
endPoints
}
}
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.util
import java.util.{Arrays, Collections, Properties}
import kafka.utils.TestUtils.assertBadConfigContainingMessage
import kafka.utils.{CoreUtils, TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Node}
import org.apache.kafka.common.config.{AbstractConfig, ConfigException, SaslConfigs, SecurityConfig, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
Expand All @@ -37,7 +37,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.CleanerConfig
Expand Down Expand Up @@ -611,7 +611,7 @@ class KafkaConfigTest {

private def listenerListToEndPoints(listenerList: java.util.List[String],
securityProtocolMap: util.Map[ListenerName, SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
AbstractKafkaConfig.listenerListToEndPoints(listenerList, securityProtocolMap)

@Test
def testListenerDefaults(): Unit = {
Expand All @@ -623,9 +623,9 @@ class KafkaConfigTest {

// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")), conf.listeners)
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala, conf.listeners)
assertNull(conf.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host)
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")))
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be in reverse order.

Suggested change
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala)
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala, conf.effectiveAdvertisedBrokerListeners))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks!

}

private def isValidKafkaConfig(props: Properties): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.config;

import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -39,10 +40,13 @@
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;

import org.apache.commons.validator.routines.InetAddressValidator;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
Expand All @@ -52,6 +56,10 @@
* For more details check KAFKA-15853
*/
public abstract class AbstractKafkaConfig extends AbstractConfig {

private static final InetAddressValidator INET_ADDRESS_VALIDATOR = InetAddressValidator.getInstance();


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant empty line.

public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(List.of(
RemoteLogManagerConfig.configDef(),
ServerConfigs.CONFIG_DEF,
Expand Down Expand Up @@ -161,6 +169,80 @@ public static Map<String, String> getMap(String propName, String propValue) {
}
}

public static List<Endpoint> listenerListToEndPoints(List<String> listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap) {
return listenerListToEndPoints(listeners, securityProtocolMap, true);
}

public static List<Endpoint> listenerListToEndPoints(List<String> listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean requireDistinctPorts) {
List<Endpoint> endPoints;
try {
endPoints = SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Error creating broker listeners from '%s': %s", listeners, e.getMessage()), e);
}
validate(endPoints, listeners, requireDistinctPorts);
return endPoints;
}
Comment on lines +176 to +185
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do the following?

Suggested change
public static List<Endpoint> listenerListToEndPoints(List<String> listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean requireDistinctPorts) {
List<Endpoint> endPoints;
try {
endPoints = SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Error creating broker listeners from '%s': %s", listeners, e.getMessage()), e);
}
validate(endPoints, listeners, requireDistinctPorts);
return endPoints;
}
public static List<Endpoint> listenerListToEndPoints(List<String> listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean requireDistinctPorts) {
try {
List<Endpoint> endPoints = SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
validate(endPoints, listeners, requireDistinctPorts);
return endPoints;
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Error creating broker listeners from '%s': %s", listeners, e.getMessage()), e);
}
}


private static void validate(List<Endpoint> endPoints, List<String> listeners, boolean requireDistinctPorts) {
long distinctListenerNames = endPoints.stream().map(Endpoint::listener).distinct().count();
if (distinctListenerNames != endPoints.size()) {
throw new IllegalArgumentException("Each listener must have a different name, listeners: " + listeners);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can return early if requireDistinctPorts is false, right? for example:

private static void validate(List<Endpoint> endPoints, List<String> listeners, boolean requireDistinctPorts) {
    long distinct = endPoints.stream().map(Endpoint::listener).distinct().count();
    if (distinct != endPoints.size()) {
        throw new IllegalArgumentException("Each listener must have a different name, listeners: " + listeners);
    }

    if (!requireDistinctPorts) return;

  ...

Map<Integer, List<Endpoint>> duplicatePorts = endPoints.stream()
.filter(ep -> ep.port() != 0) // filter port 0 for unit tests
.collect(Collectors.groupingBy(Endpoint::port))
.entrySet().stream()
.filter(entry -> entry.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can call forEach in this phase, right?


duplicatePorts.forEach((port, eps) -> {
Map<Boolean, List<Endpoint>> partitionedByValidIp = eps.stream()
.collect(Collectors.partitioningBy(ep -> ep.host() != null && INET_ADDRESS_VALIDATOR.isValid(ep.host())));

List<Endpoint> duplicatesWithIpHosts = partitionedByValidIp.get(true);
List<Endpoint> duplicatesWithoutIpHosts = partitionedByValidIp.get(false);

if (requireDistinctPorts) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requireDistinctPorts is always false, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, thanks.

checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners);
}

if (duplicatesWithIpHosts.isEmpty()) {
// No-op
} else if (duplicatesWithIpHosts.size() == 2) {
if (requireDistinctPorts) {
String errorMessage = "If you have two listeners on the same port then one needs to be IPv4 and the other IPv6, listeners: " + listeners + ", port: " + port;
Endpoint ep1 = duplicatesWithIpHosts.get(0);
Endpoint ep2 = duplicatesWithIpHosts.get(1);
if (!validateOneIsIpv4AndOtherIpv6(ep1.host(), ep2.host())) {
throw new IllegalArgumentException(errorMessage);
}

if (!duplicatesWithoutIpHosts.isEmpty()) {
throw new IllegalArgumentException(errorMessage);
}
}
} else {
if (requireDistinctPorts) {
throw new IllegalArgumentException("Each listener must have a different port unless exactly one listener has an IPv4 address and the other IPv6 address, listeners: " + listeners + ", port: " + port);
}
}
});
}

private static boolean validateOneIsIpv4AndOtherIpv6(String first, String second) {
return (INET_ADDRESS_VALIDATOR.isValidInet4Address(first) && INET_ADDRESS_VALIDATOR.isValidInet6Address(second)) ||
(INET_ADDRESS_VALIDATOR.isValidInet6Address(first) && INET_ADDRESS_VALIDATOR.isValidInet4Address(second));
}

private static void checkDuplicateListenerPorts(List<Endpoint> endpoints, List<String> listeners) {
Set<String> distinctHosts = endpoints.stream().map(ep -> ep.host() == null ? "" : ep.host()).collect(Collectors.toSet());
if (endpoints.size() > distinctHosts.size()) {
throw new IllegalArgumentException("Each listener must have a different port, listeners: " + listeners);
}
}

private static SecurityProtocol securityProtocol(String protocolName, String configName) {
try {
return SecurityProtocol.forName(protocolName);
Expand Down