diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 93624adc6f3dd..3eb8bf59b491d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1105,6 +1105,8 @@ class KafkaConfigTest { case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2) + case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 02f3d3a286024..8453d625deb02 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import java.util.Collections; import java.util.Map; @@ -93,20 +94,29 @@ public final class RemoteLogManagerConfig { public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L; public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks to copy " + + public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " + "segments, fetch remote log indexes and clean up remote log segments."; public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10; + private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " + + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + "."; + private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with( + (name, value) -> { + if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0"); + }, + () -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK + ); + public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " + - "scheduling tasks to copy segments."; - public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10; + public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " + + "to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK; + public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1; public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size"; - public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" + - " scheduling tasks to clean up remote log segments."; - public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10; - + public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " + + "to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK; + public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1; + public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms"; public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " + "segments, and clean up remote log segments."; @@ -257,16 +267,16 @@ public static ConfigDef configDef() { atLeast(1), MEDIUM, REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, + .define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, - atLeast(1), + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR, MEDIUM, REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC) - .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, + .define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, INT, DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, - atLeast(1), + REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR, MEDIUM, REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC) .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, @@ -395,11 +405,13 @@ public int remoteLogManagerThreadPoolSize() { } public int remoteLogManagerCopierThreadPoolSize() { - return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP); + return size == -1 ? remoteLogManagerThreadPoolSize() : size; } public int remoteLogManagerExpirationThreadPoolSize() { - return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP); + return size == -1 ? remoteLogManagerThreadPoolSize() : size; } public long remoteLogManagerTaskIntervalMs() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index cb28f71a45609..7ce0c46a5156b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class RemoteLogManagerConfigTest { - @Test public void testValidConfigs() { String rsmPrefix = "__custom.rsm."; @@ -56,6 +55,16 @@ public void testDefaultConfigs() { assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples()); } + @Test + public void testThreadPoolDefaults() { + // Even with empty properties, RemoteLogManagerConfig has default values + Map emptyProps = new HashMap<>(); + RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig(); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize()); + assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize()); + } + @Test public void testValidateEmptyStringConfig() { // Test with a empty string props should throw ConfigException @@ -65,7 +74,6 @@ public void testValidateEmptyStringConfig() { } private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { - Map props = new HashMap<>(); props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true); props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, @@ -108,7 +116,6 @@ private Map getRLMProps(String rsmPrefix, String rlmmPrefix) { } private static class RLMTestConfig extends AbstractConfig { - private final RemoteLogManagerConfig rlmConfig; public RLMTestConfig(Map originals) { @@ -120,4 +127,4 @@ public RemoteLogManagerConfig remoteLogManagerConfig() { return rlmConfig; } } -} \ No newline at end of file +}