From 2056edbac979d989a0d37e1d2e05fdbb129245c3 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 17 Oct 2025 09:36:26 +0800 Subject: [PATCH 1/3] KAFKA-19755 [1/N]:Move KRaftClusterTest from core module to server module --- .../kafka/server/KRaftClusterTest.scala | 231 +----------- .../apache/kafka/server/KRaftClusterTest.java | 353 ++++++++++++++++++ 2 files changed, 355 insertions(+), 229 deletions(-) create mode 100644 server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 6f552a8ebe96c..2e4487b06f9e2 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -22,7 +22,6 @@ import kafka.utils.TestUtils import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException} @@ -33,20 +32,16 @@ import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.quota.ClientQuotaAlteration.Op import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse} -import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes} -import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, TopicPartition, TopicPartitionInfo} +import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo} import org.apache.kafka.controller.{QuorumController, QuorumControllerIntegrationTestUtils} import org.apache.kafka.image.ClusterImage import org.apache.kafka.metadata.BrokerState import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, MetadataVersion} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig -import org.apache.kafka.server.quota -import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaType} import org.apache.kafka.storage.internals.log.UnifiedLog import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{Tag, Test, Timeout} @@ -58,8 +53,7 @@ import java.io.File import java.nio.charset.StandardCharsets import java.nio.file.{FileSystems, Files, Path, Paths} import java.{lang, util} -import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutionException, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ExecutionException, TimeUnit} import java.util.{Optional, OptionalLong, Properties} import scala.collection.{Seq, mutable} import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS} @@ -1161,22 +1155,6 @@ class KRaftClusterTest { } } - @Test - def testAuthorizerFailureFoundInControllerStartup(): Unit = { - val cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumControllerNodes(3).build()). - setConfigProp("authorizer.class.name", classOf[BadAuthorizer].getName).build() - try { - cluster.format() - val exception = assertThrows(classOf[ExecutionException], () => cluster.startup()) - assertEquals("java.lang.IllegalStateException: test authorizer exception", exception.getMessage) - cluster.fatalFaultHandler().setIgnore(true) - } finally { - cluster.close() - } - } - /** * Test a single broker, single controller cluster at the minimum bootstrap level. This tests * that we can function without having periodic NoOpRecords written. @@ -1197,89 +1175,6 @@ class KRaftClusterTest { } } - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testReconfigureControllerClientQuotas(combinedController: Boolean): Unit = { - val cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setCombined(combinedController). - setNumControllerNodes(1).build()). - setConfigProp("client.quota.callback.class", classOf[DummyClientQuotaCallback].getName). - setConfigProp(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "0"). - build() - - def assertConfigValue(expected: Int): Unit = { - TestUtils.retry(60000) { - assertEquals(expected, cluster.controllers().values().iterator().next(). - quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value) - assertEquals(expected, cluster.brokers().values().iterator().next(). - quotaManagers.clientQuotaCallbackPlugin.get.get.asInstanceOf[DummyClientQuotaCallback].value) - } - } - - try { - cluster.format() - cluster.startup() - cluster.waitForReadyBrokers() - assertConfigValue(0) - val admin = Admin.create(cluster.clientProperties()) - try { - admin.incrementalAlterConfigs( - util.Map.of(new ConfigResource(Type.BROKER, ""), - util.List.of(new AlterConfigOp( - new ConfigEntry(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey, "1"), OpType.SET)))). - all().get() - } finally { - admin.close() - } - assertConfigValue(1) - } finally { - cluster.close() - } - } - - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testReconfigureControllerAuthorizer(combinedMode: Boolean): Unit = { - val cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setCombined(combinedMode). - setNumControllerNodes(1).build()). - setConfigProp("authorizer.class.name", classOf[FakeConfigurableAuthorizer].getName). - build() - - def assertFoobarValue(expected: Int): Unit = { - TestUtils.retry(60000) { - assertEquals(expected, cluster.controllers().values().iterator().next(). - authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get()) - assertEquals(expected, cluster.brokers().values().iterator().next(). - authorizerPlugin.get.get.asInstanceOf[FakeConfigurableAuthorizer].foobar.get()) - } - } - - try { - cluster.format() - cluster.startup() - cluster.waitForReadyBrokers() - assertFoobarValue(0) - val admin = Admin.create(cluster.clientProperties()) - try { - admin.incrementalAlterConfigs( - util.Map.of(new ConfigResource(Type.BROKER, ""), - util.List.of(new AlterConfigOp( - new ConfigEntry(FakeConfigurableAuthorizer.foobarConfigKey, "123"), OpType.SET)))). - all().get() - } finally { - admin.close() - } - assertFoobarValue(123) - } finally { - cluster.close() - } - } - @Test def testOverlyLargeCreateTopics(): Unit = { val cluster = new KafkaClusterTestKit.Builder( @@ -1718,125 +1613,3 @@ class KRaftClusterTest { } } } - -class BadAuthorizer extends Authorizer { - - override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = { - throw new IllegalStateException("test authorizer exception") - } - - override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = ??? - - override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = ??? - - override def close(): Unit = {} - - override def configure(configs: util.Map[String, _]): Unit = {} - - override def createAcls(requestContext: AuthorizableRequestContext, aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = ??? - - override def deleteAcls(requestContext: AuthorizableRequestContext, aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = ??? -} - -object DummyClientQuotaCallback { - val dummyClientQuotaCallbackValueConfigKey = "dummy.client.quota.callback.value" -} - -class DummyClientQuotaCallback extends ClientQuotaCallback with Reconfigurable { - var value = 0 - override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): util.Map[String, String] = util.Map.of - - override def quotaLimit(quotaType: ClientQuotaType, metricTags: util.Map[String, String]): lang.Double = 1.0 - - override def updateQuota(quotaType: ClientQuotaType, quotaEntity: quota.ClientQuotaEntity, newValue: Double): Unit = {} - - override def removeQuota(quotaType: ClientQuotaType, quotaEntity: quota.ClientQuotaEntity): Unit = {} - - override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = true - - override def updateClusterMetadata(cluster: Cluster): Boolean = false - - override def close(): Unit = {} - - override def configure(configs: util.Map[String, _]): Unit = { - val newValue = configs.get(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey) - if (newValue != null) { - value = Integer.parseInt(newValue.toString) - } - } - - override def reconfigurableConfigs(): util.Set[String] = util.Set.of(DummyClientQuotaCallback.dummyClientQuotaCallbackValueConfigKey) - - override def validateReconfiguration(configs: util.Map[String, _]): Unit = { - } - - override def reconfigure(configs: util.Map[String, _]): Unit = configure(configs) -} - -object FakeConfigurableAuthorizer { - val foobarConfigKey = "fake.configurable.authorizer.foobar.config" - - def fakeConfigurableAuthorizerConfigToInt(configs: util.Map[String, _]): Int = { - val result = configs.get(foobarConfigKey) - if (result == null) { - 0 - } else { - val resultString = result.toString.trim() - try { - Integer.valueOf(resultString) - } catch { - case _: NumberFormatException => throw new ConfigException(s"Bad value of $foobarConfigKey: $resultString") - } - } - } -} - -class FakeConfigurableAuthorizer extends Authorizer with Reconfigurable { - import FakeConfigurableAuthorizer._ - - val foobar = new AtomicInteger(0) - - override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = { - serverInfo.endpoints().asScala.map(e => e -> { - val future = new CompletableFuture[Void] - future.complete(null) - future - }).toMap.asJava - } - - override def reconfigurableConfigs(): util.Set[String] = util.Set.of(foobarConfigKey) - - override def validateReconfiguration(configs: util.Map[String, _]): Unit = { - fakeConfigurableAuthorizerConfigToInt(configs) - } - - override def reconfigure(configs: util.Map[String, _]): Unit = { - foobar.set(fakeConfigurableAuthorizerConfigToInt(configs)) - } - - override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { - actions.asScala.map(_ => AuthorizationResult.ALLOWED).toList.asJava - } - - override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = util.List.of[AclBinding]() - - override def close(): Unit = {} - - override def configure(configs: util.Map[String, _]): Unit = { - foobar.set(fakeConfigurableAuthorizerConfigToInt(configs)) - } - - override def createAcls( - requestContext: AuthorizableRequestContext, - aclBindings: util.List[AclBinding] - ): util.List[_ <: CompletionStage[AclCreateResult]] = { - util.List.of - } - - override def deleteAcls( - requestContext: AuthorizableRequestContext, - aclBindingFilters: util.List[AclBindingFilter] - ): util.List[_ <: CompletionStage[AclDeleteResult]] = { - util.List.of - } -} diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java new file mode 100644 index 0000000000000..f00b0e874758c --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigOp.OpType; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigResource.Type; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; +import org.apache.kafka.server.authorizer.AclCreateResult; +import org.apache.kafka.server.authorizer.AclDeleteResult; +import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; +import org.apache.kafka.server.authorizer.AuthorizationResult; +import org.apache.kafka.server.authorizer.Authorizer; +import org.apache.kafka.server.authorizer.AuthorizerServerInfo; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class KRaftClusterTest { + + @Test + public void testAuthorizerFailureFoundInControllerStartup() throws Exception { + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumControllerNodes(3).build()). + setConfigProp("authorizer.class.name", BadAuthorizer.class.getName()).build()) { + cluster.format(); + ExecutionException exception = assertThrows(ExecutionException.class, + cluster::startup); + assertEquals("java.lang.IllegalStateException: test authorizer exception", + exception.getMessage()); + cluster.fatalFaultHandler().setIgnore(true); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReconfigureControllerClientQuotas(boolean combinedController) throws Exception { + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setCombined(combinedController) + .setNumControllerNodes(1) + .build()) + .setConfigProp("client.quota.callback.class", DummyClientQuotaCallback.class.getName()) + .setConfigProp(DummyClientQuotaCallback.DUMMY_CLIENT_QUOTA_CALLBACK_VALUE_CONFIG_KEY, "0") + .build()) { + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + assertConfigValue(cluster, 0); + + try (Admin admin = Admin.create(cluster.clientProperties())) { + admin.incrementalAlterConfigs( + Map.of(new ConfigResource(Type.BROKER, ""), + List.of(new AlterConfigOp( + new ConfigEntry(DummyClientQuotaCallback.DUMMY_CLIENT_QUOTA_CALLBACK_VALUE_CONFIG_KEY, "1"), OpType.SET)))) + .all().get(); + } + assertConfigValue(cluster, 1); + } + } + + private void assertConfigValue(KafkaClusterTestKit cluster, int expected) throws InterruptedException { + TestUtils.retryOnExceptionWithTimeout(60000, () -> { + Object controllerCallback = cluster.controllers().values().iterator().next() + .quotaManagers().clientQuotaCallbackPlugin().get().get(); + assertEquals(expected, ((DummyClientQuotaCallback) controllerCallback).value); + + Object brokerCallback = cluster.brokers().values().iterator().next() + .quotaManagers().clientQuotaCallbackPlugin().get().get(); + assertEquals(expected, ((DummyClientQuotaCallback) brokerCallback).value); + }); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testReconfigureControllerAuthorizer(boolean combinedMode) throws Exception { + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumBrokerNodes(1) + .setCombined(combinedMode) + .setNumControllerNodes(1) + .build()) + .setConfigProp("authorizer.class.name", FakeConfigurableAuthorizer.class.getName()) + .build()) { + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + + assertFoobarValue(cluster, 0); + + try (Admin admin = Admin.create(cluster.clientProperties())) { + admin.incrementalAlterConfigs( + Map.of(new ConfigResource(Type.BROKER, ""), + List.of(new AlterConfigOp( + new ConfigEntry(FakeConfigurableAuthorizer.FOOBAR_CONFIG_KEY, "123"), OpType.SET)))) + .all().get(); + } + + assertFoobarValue(cluster, 123); + } + } + + private void assertFoobarValue(KafkaClusterTestKit cluster, int expected) throws InterruptedException { + TestUtils.retryOnExceptionWithTimeout(60000, () -> { + // 获取控制器的值 + Object controllerAuthorizer = cluster.controllers().values().iterator().next() + .authorizerPlugin().get().get(); + assertEquals(expected, ((FakeConfigurableAuthorizer) controllerAuthorizer).foobar.get()); + + // 获取Broker的值 + Object brokerAuthorizer = cluster.brokers().values().iterator().next() + .authorizerPlugin().get().get(); + assertEquals(expected, ((FakeConfigurableAuthorizer) brokerAuthorizer).foobar.get()); + }); + } + + public static class BadAuthorizer implements Authorizer { + // Default constructor needed for reflection object creation + public BadAuthorizer() { + } + + @Override + public Map> start(AuthorizerServerInfo serverInfo) { + throw new IllegalStateException("test authorizer exception"); + } + + @Override + public List authorize(AuthorizableRequestContext requestContext, List actions) { + return null; + } + + @Override + public List> createAcls(AuthorizableRequestContext requestContext, + List aclBindings) { + return null; + } + + @Override + public List> deleteAcls(AuthorizableRequestContext requestContext, + List aclBindingFilters) { + return null; + } + + @Override + public Iterable acls(AclBindingFilter filter) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + } + + public static class DummyClientQuotaCallback implements ClientQuotaCallback, Reconfigurable { + // Default constructor needed for reflection object creation + public DummyClientQuotaCallback() { + } + + public static final String DUMMY_CLIENT_QUOTA_CALLBACK_VALUE_CONFIG_KEY = "dummy.client.quota.callback.value"; + + private int value = 0; + + @Override + public Map quotaMetricTags(ClientQuotaType quotaType, KafkaPrincipal principal, String clientId) { + return Collections.emptyMap(); + } + + @Override + public Double quotaLimit(ClientQuotaType quotaType, Map metricTags) { + return 1.0; + } + + @Override + public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity, double newValue) { + } + + @Override + public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity quotaEntity) { + } + + @Override + public boolean quotaResetRequired(ClientQuotaType quotaType) { + return true; + } + + @Override + public boolean updateClusterMetadata(Cluster cluster) { + return false; + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + Object newValue = configs.get(DUMMY_CLIENT_QUOTA_CALLBACK_VALUE_CONFIG_KEY); + if (newValue != null) { + value = Integer.parseInt(newValue.toString()); + } + } + + @Override + public Set reconfigurableConfigs() { + return Collections.singleton(DUMMY_CLIENT_QUOTA_CALLBACK_VALUE_CONFIG_KEY); + } + + @Override + public void validateReconfiguration(Map configs) { + } + + @Override + public void reconfigure(Map configs) { + configure(configs); + } + } + + public static class FakeConfigurableAuthorizer implements Authorizer, Reconfigurable { + // Default constructor needed for reflection object creation + public FakeConfigurableAuthorizer() { + } + + public static final String FOOBAR_CONFIG_KEY = "fake.configurable.authorizer.foobar.config"; + + private final AtomicInteger foobar = new AtomicInteger(0); + + @Override + public Map> start(AuthorizerServerInfo serverInfo) { + return serverInfo.endpoints().stream() + .collect(Collectors.toMap( + endpoint -> endpoint, + endpoint -> { + CompletableFuture future = new CompletableFuture<>(); + future.complete(null); + return future; + } + )); + } + + @Override + public Set reconfigurableConfigs() { + return Collections.singleton(FOOBAR_CONFIG_KEY); + } + + @Override + public void validateReconfiguration(Map configs) { + fakeConfigurableAuthorizerConfigToInt(configs); + } + + @Override + public void reconfigure(Map configs) { + foobar.set(fakeConfigurableAuthorizerConfigToInt(configs)); + } + + @Override + public List authorize(AuthorizableRequestContext requestContext, + List actions + ) { + return actions.stream() + .map(action -> AuthorizationResult.ALLOWED) + .collect(Collectors.toList()); + } + + @Override + public Iterable acls(AclBindingFilter filter) { + return Collections.emptyList(); + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + foobar.set(fakeConfigurableAuthorizerConfigToInt(configs)); + } + + @Override + public List> createAcls( + AuthorizableRequestContext requestContext, + List aclBindings + ) { + return Collections.emptyList(); + } + + @Override + public List> deleteAcls( + AuthorizableRequestContext requestContext, + List aclBindingFilters + ) { + return Collections.emptyList(); + } + + private int fakeConfigurableAuthorizerConfigToInt(Map configs) { + Object value = configs.get(FOOBAR_CONFIG_KEY); + if (value == null) { + return 0; + } else { + return Integer.parseInt(value.toString()); + } + } + } +} From bf03e5de096a10940c5ef14ea8214e2ce3283f60 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 17 Oct 2025 09:56:39 +0800 Subject: [PATCH 2/3] remove unused import --- .../test/scala/integration/kafka/server/KRaftClusterTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 2e4487b06f9e2..1e099f472df5b 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -22,7 +22,7 @@ import kafka.utils.TestUtils import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.config.{ConfigException, ConfigResource} +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException} import org.apache.kafka.common.message.DescribeClusterRequestData From 66bd5093a0bba19ef982e61b743156b24e4cd040 Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Fri, 17 Oct 2025 10:16:38 +0800 Subject: [PATCH 3/3] remove --- .../src/test/java/org/apache/kafka/server/KRaftClusterTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java index f00b0e874758c..d6de1fa3d8ca8 100644 --- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java +++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java @@ -148,12 +148,10 @@ public void testReconfigureControllerAuthorizer(boolean combinedMode) throws Exc private void assertFoobarValue(KafkaClusterTestKit cluster, int expected) throws InterruptedException { TestUtils.retryOnExceptionWithTimeout(60000, () -> { - // 获取控制器的值 Object controllerAuthorizer = cluster.controllers().values().iterator().next() .authorizerPlugin().get().get(); assertEquals(expected, ((FakeConfigurableAuthorizer) controllerAuthorizer).foobar.get()); - // 获取Broker的值 Object brokerAuthorizer = cluster.brokers().values().iterator().next() .authorizerPlugin().get().get(); assertEquals(expected, ((FakeConfigurableAuthorizer) brokerAuthorizer).foobar.get());