Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 3 additions & 230 deletions core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
Expand All @@ -33,20 +32,16 @@ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils}
import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.quota
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Tag, Test, Timeout}
Expand All @@ -58,8 +53,7 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{FileSystems, Files, Path, Paths}
import java.{lang, util}
import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.collection.{Seq, mutable}
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
Expand Down Expand Up @@ -1161,22 +1155,6 @@ class KRaftClusterTest {
}
}

@Test
def testAuthorizerFailureFoundInControllerStartup(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumControllerNodes(3).build()).
setConfigProp("authorizer.class.name", classOf[BadAuthorizer].getName).build()
try {
cluster.format()
val exception = assertThrows(classOf[ExecutionException], () => cluster.startup())
assertEquals("java.lang.IllegalStateException: test authorizer exception", exception.getMessage)
cluster.fatalFaultHandler().setIgnore(true)
} finally {
cluster.close()
}
}

/**
* Test a single broker, single controller cluster at the minimum bootstrap level. This tests
* that we can function without having periodic NoOpRecords written.
Expand All @@ -1197,89 +1175,6 @@ class KRaftClusterTest {
}
}

@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setCombined(combinedController).
setNumControllerNodes(1).build()).
setConfigProp("client.quota.callback.class", classOf[DummyClientQuotaCallback].getName).
setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "0").
build()

def assertConfigValue(expected: Int): Unit = {
TestUtils.retry(60000) {
assertEquals(expected, cluster.controllers().values().iterator().next().
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
assertEquals(expected, cluster.brokers().values().iterator().next().
quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value)
}
}

try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
assertConfigValue(0)
val admin = Admin.create(cluster.clientProperties())
try {
admin.incrementalAlterConfigs(
util.Map.of(new ConfigResource(Type.BROKER, ""),
util.List.of(new AlterConfigOp(
new ConfigEntry(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "1"), OpType.SET)))).
all().get()
} finally {
admin.close()
}
assertConfigValue(1)
} finally {
cluster.close()
}
}

@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testReconfigureControllerAuthorizer(combinedMode: Boolean): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setCombined(combinedMode).
setNumControllerNodes(1).build()).
setConfigProp("authorizer.class.name", classOf[FakeConfigurableAuthorizer].getName).
build()

def assertFoobarValue(expected: Int): Unit = {
TestUtils.retry(60000) {
assertEquals(expected, cluster.controllers().values().iterator().next().
authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
assertEquals(expected, cluster.brokers().values().iterator().next().
authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get())
}
}

try {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
assertFoobarValue(0)
val admin = Admin.create(cluster.clientProperties())
try {
admin.incrementalAlterConfigs(
util.Map.of(new ConfigResource(Type.BROKER, ""),
util.List.of(new AlterConfigOp(
new ConfigEntry(FakeConfigurableAuthorizer.foobarConfigKey, "123"), OpType.SET)))).
all().get()
} finally {
admin.close()
}
assertFoobarValue(123)
} finally {
cluster.close()
}
}

@Test
def testOverlyLargeCreateTopics(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
Expand Down Expand Up @@ -1718,125 +1613,3 @@ class KRaftClusterTest {
}
}
}

class BadAuthorizer extends Authorizer {

override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
throw new IllegalStateException("test authorizer exception")
}

override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = ???

override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = ???

override def close(): Unit = {}

override def configure(configs: util.Map[String, _]): Unit = {}

override def createAcls(requestContext: AuthorizableRequestContext, aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = ???

override def deleteAcls(requestContext: AuthorizableRequestContext, aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = ???
}

object DummyClientQuotaCallback {
val dummyClientQuotaCallbackValueConfigKey = "dummy.client.quota.callback.value"
}

class DummyClientQuotaCallback extends ClientQuotaCallback with Reconfigurable {
var value = 0
override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = util.Map.of

override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = 1.0

override def updateQuota(quotaType: ClientQuotaType, quotaEntity: quota.ClientQuotaEntity, newValue: Double): Unit = {}

override def removeQuota(quotaType: ClientQuotaType, quotaEntity: quota.ClientQuotaEntity): Unit = {}

override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = true

override def updateClusterMetadata(cluster: Cluster): Boolean = false

override def close(): Unit = {}

override def configure(configs: util.Map[String, _]): Unit = {
val newValue = configs.get(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey)
if (newValue != null) {
value = Integer.parseInt(newValue.toString)
}
}

override def reconfigurableConfigs(): util.Set[String] = util.Set.of(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey)

override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
}

override def reconfigure(configs: util.Map[String, _]): Unit = configure(configs)
}

object FakeConfigurableAuthorizer {
val foobarConfigKey = "fake.configurable.authorizer.foobar.config"

def fakeConfigurableAuthorizerConfigToInt(configs: util.Map[String, _]): Int = {
val result = configs.get(foobarConfigKey)
if (result == null) {
0
} else {
val resultString = result.toString.trim()
try {
Integer.valueOf(resultString)
} catch {
case _: NumberFormatException => throw new ConfigException(s"Bad value of $foobarConfigKey: $resultString")
}
}
}
}

class FakeConfigurableAuthorizer extends Authorizer with Reconfigurable {
import FakeConfigurableAuthorizer._

val foobar = new AtomicInteger(0)

override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
serverInfo.endpoints().asScala.map(e => e -> {
val future = new CompletableFuture[Void]
future.complete(null)
future
}).toMap.asJava
}

override def reconfigurableConfigs(): util.Set[String] = util.Set.of(foobarConfigKey)

override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
fakeConfigurableAuthorizerConfigToInt(configs)
}

override def reconfigure(configs: util.Map[String, _]): Unit = {
foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
}

override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
actions.asScala.map(_ => AuthorizationResult.ALLOWED).toList.asJava
}

override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = util.List.of[AclBinding]()

override def close(): Unit = {}

override def configure(configs: util.Map[String, _]): Unit = {
foobar.set(fakeConfigurableAuthorizerConfigToInt(configs))
}

override def createAcls(
requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]
): util.List[_ <: CompletionStage[AclCreateResult]] = {
util.List.of
}

override def deleteAcls(
requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]
): util.List[_ <: CompletionStage[AclDeleteResult]] = {
util.List.of
}
}
Loading
Loading