Skip to content

Commit

Permalink
proof of concept test for cache cleanup parallelization
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Oct 1, 2024
1 parent f3232cb commit 3dee239
Showing 1 changed file with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand Down Expand Up @@ -514,6 +517,10 @@ class IndicesRequestCacheCleanupManager implements Closeable {
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

// TODO: Proof of concept, may need to use opensearch threadpools passed in from Node.java instead
private static final int numCleanupKeyThreads = 3;
//private final ThreadPoolExecutor cleanupKeyRemovalPool;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
this.stalenessThreshold = stalenessThreshold;
this.keysToClean = ConcurrentCollections.newConcurrentSet();
Expand Down Expand Up @@ -721,6 +728,7 @@ private synchronized void cleanCache(double stalenessThreshold) {
if (canSkipCacheCleanup(stalenessThreshold)) {
return;
}
ThreadPoolExecutor removalPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numCleanupKeyThreads);
// Contains CleanupKey objects with open shard but invalidated readerCacheKeyId.
final Set<CleanupKey> cleanupKeysFromOutdatedReaders = new HashSet<>();
// Contains CleanupKey objects for a full cache cleanup.
Expand Down Expand Up @@ -755,18 +763,21 @@ private synchronized void cleanCache(double stalenessThreshold) {
Key delegatingKey = key.key;
Tuple<ShardId, Integer> shardIdInfo = new Tuple<>(delegatingKey.shardId, delegatingKey.indexShardHashCode);
if (cleanupKeysFromFullClean.contains(shardIdInfo) || cleanupKeysFromClosedShards.contains(shardIdInfo)) {
iterator.remove();
//iterator.remove();
removeKey(removalPool, key);
} else {
CacheEntity cacheEntity = cacheEntityLookup.apply(delegatingKey.shardId).orElse(null);
if (cacheEntity == null) {
// If cache entity is null, it means that index or shard got deleted/closed meanwhile.
// So we will delete this key.
dimensionListsToDrop.add(key.dimensions);
iterator.remove();
//iterator.remove();
removeKey(removalPool, key);
} else {
CleanupKey cleanupKey = new CleanupKey(cacheEntity, delegatingKey.readerCacheKeyId);
if (cleanupKeysFromOutdatedReaders.contains(cleanupKey)) {
iterator.remove();
//iterator.remove();
removeKey(removalPool, key);
}
}
}
Expand All @@ -783,9 +794,19 @@ private synchronized void cleanCache(double stalenessThreshold) {
dummyKey.setDropStatsForDimensions(true);
cache.invalidate(dummyKey);
}
// How to await the pool being done?
// This is only TODO proof of concept
removalPool.shutdown();
try {
removalPool.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception ignored) {}
cache.refresh();
}

private void removeKey(ThreadPoolExecutor pool, ICacheKey<Key> key) {
pool.execute(() -> cache.invalidate(key));
}

/**
* Determines whether the cache cleanup process can be skipped based on the staleness threshold.
*
Expand Down

0 comments on commit 3dee239

Please sign in to comment.