Skip to content

Commit

Permalink
KAFKA-17031: Make RLM thread pool configurations public and fix defau…
Browse files Browse the repository at this point in the history
…lt handling (#17499)

According to KIP-950, remote.log.manager.thread.pool.size should be marked as deprecated and replaced by two new configurations: remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 works as expected.

Reviewers: Luke Chen <[email protected]>, Gaurav Narula <[email protected]>, Satish Duggana <[email protected]>, Colin P. McCabe <[email protected]>
  • Loading branch information
fvaleri authored Oct 21, 2024
1 parent e3751a8 commit 84ab3b9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 18 deletions.
2 changes: 2 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,8 @@ class KafkaConfigTest {
case RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP => // ignore string
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -2)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -93,20 +94,29 @@ public final class RemoteLogManagerConfig {
public static final long DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES = 1024 * 1024 * 1024L;

public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP = "remote.log.manager.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks to copy " +
public static final String REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC = "Deprecated. Size of the thread pool used in scheduling tasks to copy " +
"segments, fetch remote log indexes and clean up remote log segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;

private static final String REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK = "The default value of -1 means that this will be set to the configured value of " +
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP + ", if available; otherwise, it defaults to " + DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE + ".";
private static final ConfigDef.Validator REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR = ConfigDef.LambdaValidator.with(
(name, value) -> {
if ((int) value < -1 || (int) value == 0) throw new ConfigException(name, value, "Value can be -1 or greater than 0");
},
() -> REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK
);

public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " +
"scheduling tasks to copy segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10;
public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
"to copy segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = -1;

public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size";
public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" +
" scheduling tasks to clean up remote log segments.";
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;

public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in scheduling tasks " +
"to clean up remote log segments. " + REMOTE_LOG_MANAGER_THREAD_POOL_FALLBACK;
public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = -1;
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms";
public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " +
"segments, and clean up remote log segments.";
Expand Down Expand Up @@ -257,16 +267,16 @@ public static ConfigDef configDef() {
atLeast(1),
MEDIUM,
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
.define(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
atLeast(1),
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
MEDIUM,
REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
.define(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
INT,
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
atLeast(1),
REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_VALIDATOR,
MEDIUM,
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
.define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
Expand Down Expand Up @@ -395,11 +405,13 @@ public int remoteLogManagerThreadPoolSize() {
}

public int remoteLogManagerCopierThreadPoolSize() {
return config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
int size = config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP);
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
}

public int remoteLogManagerExpirationThreadPoolSize() {
return config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
int size = config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP);
return size == -1 ? remoteLogManagerThreadPoolSize() : size;
}

public long remoteLogManagerTaskIntervalMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RemoteLogManagerConfigTest {

@Test
public void testValidConfigs() {
String rsmPrefix = "__custom.rsm.";
Expand All @@ -56,6 +55,16 @@ public void testDefaultConfigs() {
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopyNumQuotaSamples());
}

@Test
public void testThreadPoolDefaults() {
// Even with empty properties, RemoteLogManagerConfig has default values
Map<String, Object> emptyProps = new HashMap<>();
RemoteLogManagerConfig remoteLogManagerConfigEmptyConfig = new RLMTestConfig(emptyProps).remoteLogManagerConfig();
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerCopierThreadPoolSize());
assertEquals(RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE, remoteLogManagerConfigEmptyConfig.remoteLogManagerExpirationThreadPoolSize());
}

@Test
public void testValidateEmptyStringConfig() {
// Test with a empty string props should throw ConfigException
Expand All @@ -65,7 +74,6 @@ public void testValidateEmptyStringConfig() {
}

private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {

Map<String, Object> props = new HashMap<>();
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
Expand Down Expand Up @@ -108,7 +116,6 @@ private Map<String, Object> getRLMProps(String rsmPrefix, String rlmmPrefix) {
}

private static class RLMTestConfig extends AbstractConfig {

private final RemoteLogManagerConfig rlmConfig;

public RLMTestConfig(Map<?, ?> originals) {
Expand All @@ -120,4 +127,4 @@ public RemoteLogManagerConfig remoteLogManagerConfig() {
return rlmConfig;
}
}
}
}

0 comments on commit 84ab3b9

Please sign in to comment.