Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
Expand Down Expand Up @@ -104,7 +104,8 @@ object DynamicBrokerConfig {
DynamicReplicationConfig.ReconfigurableConfigs ++
Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
QuotaConfig.brokerQuotaConfigsNames().asScala

private val ClusterLevelListenerConfigs = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public static ConfigDef brokerQuotaConfigs() {
ConfigDef.Importance.MEDIUM, QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_DOC);
}

public static Set<String> brokerQuotaConfigsNames() {
return Set.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using brokerQuotaConfigs?

    public static Set<String> brokerQuotaConfigsNames() {
        return Set.copyOf(brokerQuotaConfigs().names());
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed it.

LEADER_REPLICATION_THROTTLED_RATE_CONFIG,
FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG,
REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG
);
}

public static ConfigDef userAndClientQuotaConfigs() {
ConfigDef configDef = new ConfigDef();
buildUserClientQuotaConfigDef(configDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.QuotaConfig;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.Timeout;
Expand All @@ -73,6 +74,7 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -82,6 +84,7 @@
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -431,4 +434,28 @@ public void testIncrementalAlterConfigsBySingleControllerWithDynamicQuorum(Clust
public void testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterConfigs(clusterInstance, true);
}

@ClusterTest
public void testQuotaConfigsIsReadOnlyShouldBeFalse(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
int nodeId = clusterInstance.controllers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
nodeResource, List.of(
new AlterConfigOp(new ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, "16800"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, "16800"), AlterConfigOp.OpType.SET)
));
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
Config config = admin.describeConfigs(List.of(nodeResource)).
all().get(1, TimeUnit.MINUTES).get(nodeResource);
Map<String, ConfigEntry> configEntries = config.entries().stream()
.collect(Collectors.toMap(ConfigEntry::name, e -> e));
assertFalse(configEntries.get(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
assertFalse(configEntries.get(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG).isReadOnly());
assertFalse(configEntries.get(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG).isReadOnly());
});
}
}
}