diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index f0b077b17f1c8..8d0d9edb857c5 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.KafkaRaftClient import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} +import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} @@ -104,7 +104,8 @@ object DynamicBrokerConfig { DynamicReplicationConfig.ReconfigurableConfigs ++ Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++ GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++ - ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala + ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++ + QuotaConfig.brokerQuotaConfigsNames().asScala private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff( diff --git a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java index 8e89d1a28c071..82c31ac2ec1e3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfig.java @@ -225,6 +225,10 @@ public static ConfigDef brokerQuotaConfigs() { ConfigDef.Importance.MEDIUM, QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC); } + public static Set brokerQuotaConfigsNames() { + return Set.copyOf(brokerQuotaConfigs().names()); + } + public static ConfigDef userAndClientQuotaConfigs() { ConfigDef configDef = new ConfigDef(); buildUserClientQuotaConfigDef(configDef); diff --git a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java index a97a4cb3bed5c..e398394a5137c 100644 --- a/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java @@ -62,6 +62,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.QuotaConfig; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Timeout; @@ -73,6 +74,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -82,6 +84,7 @@ import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -431,4 +434,28 @@ public void testIncrementalAlterConfigsBySingleControllerWithDynamicQuorum(Clust public void testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception { testIncrementalAlterConfigs(clusterInstance, true); } + + @ClusterTest + public void testQuotaConfigsIsReadOnlyShouldBeFalse(ClusterInstance clusterInstance) throws Exception { + try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) { + int nodeId = clusterInstance.controllers().values().iterator().next().config().nodeId(); + ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId); + Map> alterations = Map.of( + nodeResource, List.of( + new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, "16800"), AlterConfigOp.OpType.SET) + )); + admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES); + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + Config config = admin.describeConfigs(List.of(nodeResource)). + all().get(1, TimeUnit.MINUTES).get(nodeResource); + Map configEntries = config.entries().stream() + .collect(Collectors.toMap(ConfigEntry::name, e -> e)); + assertFalse(configEntries.get(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly()); + assertFalse(configEntries.get(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly()); + assertFalse(configEntries.get(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).isReadOnly()); + }); + } + } }