Skip to content

Commit

Permalink
[FLINK-37166] Use concurrenthashmap in flink config manager
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jan 20, 2025
1 parent cd55bdc commit 4c2c90c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,7 +87,7 @@ public class FlinkConfigManager {
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
private final LoadingCache<Key, Configuration> cache;
private final Consumer<Set<String>> namespaceListener;
private volatile Map<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;
private volatile ConcurrentHashMap<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;

protected static final Pattern FLINK_VERSION_PATTERN =
Pattern.compile(
Expand All @@ -114,7 +114,7 @@ public FlinkConfigManager(
this.namespaceListener = namespaceListener;
Duration cacheTimeout =
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
this.relevantFlinkVersionPrefixes = new HashMap<>();
this.relevantFlinkVersionPrefixes = new ConcurrentHashMap<>();
this.cache =
CacheBuilder.newBuilder()
.maximumSize(
Expand Down Expand Up @@ -189,7 +189,7 @@ public void updateDefaultConfig(Configuration newConf) {
// We clear the cached relevant Flink version prefixes as the base config may include new
// version overrides.
// This will trigger a regeneration of the prefixes in the next call to getDefaultConfig.
relevantFlinkVersionPrefixes = new HashMap<>();
relevantFlinkVersionPrefixes = new ConcurrentHashMap<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
Expand Down Expand Up @@ -380,4 +381,44 @@ public void testVersionNamespaceDefaultConfs() {
assertEquals("v3", observeConfig.get("conf3"));
assertEquals("false", observeConfig.get("conf0"));
}

@Test
public void testConcurrentDefaultConfig() throws InterruptedException {
var opConf = new Configuration();
var configManager = new FlinkConfigManager(opConf);
var completed1 = new AtomicBoolean();
var completed2 = new AtomicBoolean();
var completed3 = new AtomicBoolean();

var t1 =
new Thread(
() -> {
configManager.getDefaultConfig("ns1", FlinkVersion.v1_18);
completed1.set(true);
});
var t2 =
new Thread(
() -> {
configManager.getDefaultConfig("ns1", FlinkVersion.v1_18);
completed2.set(true);
});
var t3 =
new Thread(
() -> {
configManager.getDefaultConfig("ns1", FlinkVersion.v1_18);
completed3.set(true);
});

t1.start();
t2.start();
t3.start();

t1.join();
t2.join();
t3.join();

assertTrue(completed1.get());
assertTrue(completed2.get());
assertTrue(completed3.get());
}
}

0 comments on commit 4c2c90c

Please sign in to comment.