From c06cdf275e14b4a32e29fb8dc6689d898a65ed41 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 3 Jun 2024 16:58:02 -0700 Subject: [PATCH 01/10] Moving query recompute out of write lock Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 66 +++++++++++++++++-- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index f40c35dde83de..2377c2b17f829 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -35,9 +35,13 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.ToLongBiFunction; @@ -86,6 +90,8 @@ public class TieredSpilloverCache implements ICache { private final Map, TierInfo> caches; private final List> policies; + Map, CompletableFuture, V>>> completableFutureMap = new ConcurrentHashMap<>(); + TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); @@ -190,10 +196,7 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. // This is needed as there can be many requests for the same key at the same time and we only want to load // the value once. - V value = null; - try (ReleasableLock ignore = writeLock.acquire()) { - value = onHeapCache.computeIfAbsent(key, loader); - } + V value = compute(key, loader); // Handle stats if (loader.isLoaded()) { // The value was just computed and added to the cache by this thread. Register a miss for the heap cache, and the disk cache @@ -222,6 +225,61 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> return cacheValueTuple.v1(); } + private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { + // A future that returns a pair of key/value. + CompletableFuture, V>> completableFuture = new CompletableFuture<>(); + // Only one of the threads will succeed putting a future into map for the same key. + // Rest will fetch existing future. + CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, completableFuture); + // Handler to handle results post processing. Takes a tuple or exception as an input and returns + // the value. Also before returning value, puts the value in cache. + BiFunction, V>, Throwable, V> handler = (pair, ex) -> { + V value = null; + if (pair != null) { + try (ReleasableLock ignore = writeLock.acquire()) { + onHeapCache.put(pair.v1(), pair.v2()); + } + value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should + // be safe to assume if we got no exception and reached here. + } + completableFutureMap.remove(key); // Remove key from map as not needed anymore. + return value; + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V value; + try { + value = loader.load(key); + } catch (Exception ex) { + future.completeExceptionally(ex); + throw new ExecutionException(ex); + } + if (value == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Tuple<>(key, value)); + } + + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("Future completed exceptionally but no error thrown"); + } + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + return value; + } + @Override public void invalidate(ICacheKey key) { // We are trying to invalidate the key from all caches though it would be present in only of them. From b0f655067594ebaca86295f20cde5528ac9236dc Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 11 Jun 2024 09:46:03 -0700 Subject: [PATCH 02/10] [Tiered Caching] Moving query recomputation logic outside of write lock Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 4 +- .../tier/TieredSpilloverCacheTests.java | 233 +++++++++++++++++- 2 files changed, 234 insertions(+), 3 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 2377c2b17f829..e70a7dffc4c02 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -257,7 +257,7 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader throw new ExecutionException(ex); } if (value == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); + NullPointerException npe = new NullPointerException("Loader returned a null value"); future.completeExceptionally(npe); throw new ExecutionException(npe); } else { @@ -271,7 +271,7 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader try { value = completableValue.get(); if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers + future.get(); // call get to force the same exception to be thrown for other concurrent callers throw new IllegalStateException("Future completed exceptionally but no error thrown"); } } catch (InterruptedException ex) { diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 6c49341591589..90652ff5197ed 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -44,8 +44,12 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; @@ -408,6 +412,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { assertEquals(onHeapCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); assertEquals(cacheMiss + numOfItems1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); assertEquals(diskCacheHit, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } public void testComputeIfAbsentWithEvictionsFromTieredCache() throws Exception { @@ -811,7 +816,7 @@ public String load(ICacheKey key) { threads[i].start(); } phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. + countDownLatch.await(); int numberOfTimesKeyLoaded = 0; assertEquals(numberOfSameKeys, loadAwareCacheLoaderList.size()); for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { @@ -824,6 +829,232 @@ public String load(ICacheKey key) { // We should see only one heap miss, and the rest hits assertEquals(1, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); assertEquals(numberOfSameKeys - 1, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + + public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception { + int onHeapCacheSize = randomIntBetween(300, 500); + int diskCacheSize = randomIntBetween(600, 700); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int iterations = 10; + int numberOfKeys = 20; + List> iCacheKeyList = new ArrayList<>(); + for (int i = 0; i< numberOfKeys; i++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + iCacheKeyList.add(key); + } + ExecutorService executorService = Executors.newFixedThreadPool(8); + CountDownLatch countDownLatch = new CountDownLatch(iterations*numberOfKeys); // To wait for all threads to finish. + + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < iterations; i++) { + for (int j = 0; j < numberOfKeys; j++) { + int finalJ = j; + executorService.submit(() -> { + try { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + isLoaded = true; + return iCacheKeyList.get(finalJ).key; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + tieredSpilloverCache.computeIfAbsent(iCacheKeyList.get(finalJ), loadAwareCacheLoader); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + } + } + countDownLatch.await(); + int numberOfTimesKeyLoaded = 0; + assertEquals(iterations * numberOfKeys, loadAwareCacheLoaderList.size()); + for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { + LoadAwareCacheLoader, String> loader = loadAwareCacheLoaderList.get(i); + if (loader.isLoaded()) { + numberOfTimesKeyLoaded++; + } + } + assertEquals(numberOfKeys, numberOfTimesKeyLoaded); // It should be loaded only once. + // We should see only one heap miss, and the rest hits + assertEquals(numberOfKeys, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals((iterations * numberOfKeys) - numberOfKeys, getHitsForTier(tieredSpilloverCache, + TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + executorService.shutdownNow(); + } + + public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + String value = UUID.randomUUID().toString(); + AtomicInteger exceptionCount = new AtomicInteger(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + throw new RuntimeException("Testing"); + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + assertEquals(ExecutionException.class, e.getClass()); + assertEquals(RuntimeException.class, e.getCause().getClass()); + assertEquals("Testing", e.getCause().getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + + // Verify exception count was equal to number of requests + assertEquals(numberOfSameKeys, exceptionCount.get()); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + + public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + String value = UUID.randomUUID().toString(); + AtomicInteger exceptionCount = new AtomicInteger(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + return null; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + assertEquals(ExecutionException.class, e.getClass()); + assertEquals(NullPointerException.class, e.getCause().getClass()); + assertEquals("Loader returned a null value", e.getCause().getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + + // Verify exception count was equal to number of requests + assertEquals(numberOfSameKeys, exceptionCount.get()); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception { From 1dcb7e9f0bb9528a769fff683b2eb4efc79550b7 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 11 Jun 2024 11:16:37 -0700 Subject: [PATCH 03/10] Adding java doc for the completable map Signed-off-by: Sagar Upadhyaya --- .../opensearch/cache/common/tier/TieredSpilloverCache.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index e70a7dffc4c02..4b635c4ea33fa 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -90,6 +90,10 @@ public class TieredSpilloverCache implements ICache { private final Map, TierInfo> caches; private final List> policies; + /** + * This map is used to handle concurrent requests for same key in computeIfAbsent() to ensure we load the value + * only once. + */ Map, CompletableFuture, V>>> completableFutureMap = new ConcurrentHashMap<>(); TieredSpilloverCache(Builder builder) { From 6cf22488f51e58c2aadd99e5afeebed885f7ba98 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 18 Jun 2024 00:12:46 -0700 Subject: [PATCH 04/10] Changes to call future handler only once per key Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 26 +++++++++---------- .../tier/TieredSpilloverCacheTests.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 4b635c4ea33fa..c54d719bb39b5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -8,6 +8,8 @@ package org.opensearch.cache.common.tier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.cache.common.policy.TookTimePolicy; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; @@ -65,6 +67,7 @@ public class TieredSpilloverCache implements ICache { // Used to avoid caching stale entries in lower tiers. private static final List SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY); + private static final Logger logger = LogManager.getLogger(TieredSpilloverCache.class); private final ICache diskCache; private final ICache onHeapCache; @@ -230,11 +233,9 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> } private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { - // A future that returns a pair of key/value. - CompletableFuture, V>> completableFuture = new CompletableFuture<>(); // Only one of the threads will succeed putting a future into map for the same key. - // Rest will fetch existing future. - CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, completableFuture); + // Rest will fetch existing future and wait on that to complete. + CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>()); // Handler to handle results post processing. Takes a tuple or exception as an input and returns // the value. Also before returning value, puts the value in cache. BiFunction, V>, Throwable, V> handler = (pair, ex) -> { @@ -245,15 +246,18 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader } value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should // be safe to assume if we got no exception and reached here. + } else { + if (ex != null) { + logger.warn("Exception occurred while trying to compute the value", ex); + } } completableFutureMap.remove(key); // Remove key from map as not needed anymore. return value; }; - CompletableFuture completableValue; if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V value; + V value = null; + future = completableFutureMap.get(key); + future.handle(handler); try { value = loader.load(key); } catch (Exception ex) { @@ -267,15 +271,11 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader } else { future.complete(new Tuple<>(key, value)); } - - } else { - completableValue = future.handle(handler); } V value; try { - value = completableValue.get(); + value = future.get().v2(); if (future.isCompletedExceptionally()) { - future.get(); // call get to force the same exception to be thrown for other concurrent callers throw new IllegalStateException("Future completed exceptionally but no error thrown"); } } catch (InterruptedException ex) { diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 90652ff5197ed..46fa78c1bed54 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -807,7 +807,7 @@ public String load(ICacheKey key) { }; loadAwareCacheLoaderList.add(loadAwareCacheLoader); phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + assertEquals(value, tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader)); } catch (Exception e) { throw new RuntimeException(e); } From ebbe8f37a6ca35c07370dfa2878c7b620d2da714 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 18 Jun 2024 00:48:37 -0700 Subject: [PATCH 05/10] Fixing spotless check Signed-off-by: Sagar Upadhyaya --- .../opensearch/cache/common/tier/TieredSpilloverCache.java | 7 ++----- .../cache/common/tier/TieredSpilloverCacheTests.java | 7 +++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index c54d719bb39b5..6cca4266ed315 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -238,21 +238,18 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, new CompletableFuture<>()); // Handler to handle results post processing. Takes a tuple or exception as an input and returns // the value. Also before returning value, puts the value in cache. - BiFunction, V>, Throwable, V> handler = (pair, ex) -> { - V value = null; + BiFunction, V>, Throwable, Void> handler = (pair, ex) -> { if (pair != null) { try (ReleasableLock ignore = writeLock.acquire()) { onHeapCache.put(pair.v1(), pair.v2()); } - value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should - // be safe to assume if we got no exception and reached here. } else { if (ex != null) { logger.warn("Exception occurred while trying to compute the value", ex); } } completableFutureMap.remove(key); // Remove key from map as not needed anymore. - return value; + return null; }; if (future == null) { V value = null; diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 46fa78c1bed54..1fc5a12a2d124 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -858,12 +858,12 @@ public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception { int iterations = 10; int numberOfKeys = 20; List> iCacheKeyList = new ArrayList<>(); - for (int i = 0; i< numberOfKeys; i++) { + for (int i = 0; i < numberOfKeys; i++) { ICacheKey key = getICacheKey(UUID.randomUUID().toString()); iCacheKeyList.add(key); } ExecutorService executorService = Executors.newFixedThreadPool(8); - CountDownLatch countDownLatch = new CountDownLatch(iterations*numberOfKeys); // To wait for all threads to finish. + CountDownLatch countDownLatch = new CountDownLatch(iterations * numberOfKeys); // To wait for all threads to finish. List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); @@ -907,8 +907,7 @@ public String load(ICacheKey key) { assertEquals(numberOfKeys, numberOfTimesKeyLoaded); // It should be loaded only once. // We should see only one heap miss, and the rest hits assertEquals(numberOfKeys, getMissesForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); - assertEquals((iterations * numberOfKeys) - numberOfKeys, getHitsForTier(tieredSpilloverCache, - TIER_DIMENSION_VALUE_ON_HEAP)); + assertEquals((iterations * numberOfKeys) - numberOfKeys, getHitsForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_ON_HEAP)); assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); executorService.shutdownNow(); } From 2285daa6bcf52af523d87582533e47a9230844ee Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 18 Jun 2024 09:50:26 -0700 Subject: [PATCH 06/10] Added changelog Signed-off-by: Sagar Upadhyaya --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0b5dd289ec85..9a408d5d325d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) +- [Tiered Caching] Move query recomputation logic outside write lock ([#14187](https://github.com/opensearch-project/OpenSearch/pull/14187)) ### Deprecated From 5262362cca2933a37eb48f137c19c7b18e2e08aa Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 21 Jun 2024 15:07:59 -0700 Subject: [PATCH 07/10] Addressing comments Signed-off-by: Sagar Upadhyaya --- .../common/tier/TieredSpilloverCache.java | 36 ++- .../tier/TieredSpilloverCacheTests.java | 305 +++++++++++------- 2 files changed, 220 insertions(+), 121 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 91890cd25c2b1..b6d6913a9f8d4 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -242,6 +242,11 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader if (pair != null) { try (ReleasableLock ignore = writeLock.acquire()) { onHeapCache.put(pair.v1(), pair.v2()); + } catch (Exception e) { + // TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal + // listeners/stats. Needs better exception handling at underlying layers.For now swallowing + // exception. + logger.warn("Exception occurred while putting item onto heap cache", e); } } else { if (ex != null) { @@ -251,8 +256,8 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader completableFutureMap.remove(key); // Remove key from map as not needed anymore. return null; }; + V value = null; if (future == null) { - V value = null; future = completableFutureMap.get(key); future.handle(handler); try { @@ -268,15 +273,12 @@ private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader } else { future.complete(new Tuple<>(key, value)); } - } - V value; - try { - value = future.get().v2(); - if (future.isCompletedExceptionally()) { - throw new IllegalStateException("Future completed exceptionally but no error thrown"); + } else { + try { + value = future.get().v2(); + } catch (InterruptedException ex) { + throw new IllegalStateException(ex); } - } catch (InterruptedException ex) { - throw new IllegalStateException(ex); } return value; } @@ -387,12 +389,22 @@ void handleRemovalFromHeapTier(RemovalNotification, V> notification ICacheKey key = notification.getKey(); boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()); boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier - if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) { + boolean exceptionOccurredOnDiskCachePut = false; + boolean canCacheOnDisk = caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue()); + if (canCacheOnDisk) { try (ReleasableLock ignore = writeLock.acquire()) { diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats + } catch (Exception ex) { + // TODO: Catch specific exceptions. Needs better exception handling. We are just swallowing exception + // in this case as it shouldn't cause upstream request to fail. + logger.warn("Exception occurred while putting item to disk cache", ex); + exceptionOccurredOnDiskCachePut = true; } - updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); - } else { + if (!exceptionOccurredOnDiskCachePut) { + updateStatsOnPut(TIER_DIMENSION_VALUE_DISK, key, notification.getValue()); + } + } + if (!canCacheOnDisk || exceptionOccurredOnDiskCachePut) { // If the value is not going to the disk cache, send this notification to the TSC's removal listener // as the value is leaving the TSC entirely removalListener.onRemoval(notification); diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index a63aae4e096c2..98a4cc8bd3924 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -60,6 +60,10 @@ import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TieredSpilloverCacheTests extends OpenSearchTestCase { static final List dimensionNames = List.of("dim1", "dim2", "dim3"); @@ -866,10 +870,9 @@ public void testComputIfAbsentConcurrentlyWithMultipleKeys() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(iterations * numberOfKeys); // To wait for all threads to finish. List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); - - for (int i = 0; i < iterations; i++) { - for (int j = 0; j < numberOfKeys; j++) { - int finalJ = j; + for (int j = 0; j < numberOfKeys; j++) { + int finalJ = j; + for (int i = 0; i < iterations; i++) { executorService.submit(() -> { try { LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { @@ -890,8 +893,9 @@ public String load(ICacheKey key) { tieredSpilloverCache.computeIfAbsent(iCacheKeyList.get(finalJ), loadAwareCacheLoader); } catch (Exception e) { throw new RuntimeException(e); + } finally { + countDownLatch.countDown(); } - countDownLatch.countDown(); }); } } @@ -913,6 +917,23 @@ public String load(ICacheKey key) { } public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public String load(ICacheKey key) { + throw new RuntimeException("Testing"); + } + }; + verifyComputeIfAbsentThrowsException(RuntimeException.class, loadAwareCacheLoader, "Testing"); + } + + public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Exception { int onHeapCacheSize = randomIntBetween(100, 300); int diskCacheSize = randomIntBetween(200, 400); int keyValueSize = 50; @@ -926,67 +947,60 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception onHeapCacheSize * keyValueSize + "b" ) .build(); + ICache.Factory onHeapCacheFactory = mock(OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.class); + ICache mockOnHeapCache = mock(ICache.class); + when(onHeapCacheFactory.create(any(), any(), any())).thenReturn(mockOnHeapCache); + doThrow(new RuntimeException("Testing")).when(mockOnHeapCache).put(any(), any()); + CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) + .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); + ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize, false); - TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( - keyValueSize, - diskCacheSize, - removalListener, - settings, - 0 - ); - - int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); - ICacheKey key = getICacheKey(UUID.randomUUID().toString()); - String value = UUID.randomUUID().toString(); - AtomicInteger exceptionCount = new AtomicInteger(); - - Thread[] threads = new Thread[numberOfSameKeys]; - Phaser phaser = new Phaser(numberOfSameKeys + 1); - CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. - - List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); - - for (int i = 0; i < numberOfSameKeys; i++) { - threads[i] = new Thread(() -> { - try { - LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { - boolean isLoaded = false; - - @Override - public boolean isLoaded() { - return isLoaded; - } + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig) + .build(); + String value = ""; + try { + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } - @Override - public String load(ICacheKey key) { - throw new RuntimeException("Testing"); - } - }; - loadAwareCacheLoaderList.add(loadAwareCacheLoader); - phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); - } catch (Exception e) { - exceptionCount.incrementAndGet(); - assertEquals(ExecutionException.class, e.getClass()); - assertEquals(RuntimeException.class, e.getCause().getClass()); - assertEquals("Testing", e.getCause().getMessage()); - } finally { - countDownLatch.countDown(); + @Override + public String load(ICacheKey key) { + return "test"; } }); - threads[i].start(); - } - phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. - - // Verify exception count was equal to number of requests - assertEquals(numberOfSameKeys, exceptionCount.get()); + } catch (Exception ex) {} + assertEquals("test", value); assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } - public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { - int onHeapCacheSize = randomIntBetween(100, 300); - int diskCacheSize = randomIntBetween(200, 400); + public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exception { + int onHeapCacheSize = 0; int keyValueSize = 50; MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); @@ -998,62 +1012,75 @@ public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exce onHeapCacheSize * keyValueSize + "b" ) .build(); + ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); + CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) + .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); + ICache.Factory mockDiskCacheFactory = mock(MockDiskCache.MockDiskCacheFactory.class); + ICache mockDiskCache = mock(ICache.class); + when(mockDiskCacheFactory.create(any(), any(), any())).thenReturn(mockDiskCache); + doThrow(new RuntimeException("Test")).when(mockDiskCache).put(any(), any()); - TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( - keyValueSize, - diskCacheSize, - removalListener, - settings, - 0 - ); - - int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); - ICacheKey key = getICacheKey(UUID.randomUUID().toString()); - String value = UUID.randomUUID().toString(); - AtomicInteger exceptionCount = new AtomicInteger(); - - Thread[] threads = new Thread[numberOfSameKeys]; - Phaser phaser = new Phaser(numberOfSameKeys + 1); - CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig) + .build(); + String value = ""; + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } - List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + @Override + public String load(ICacheKey key) { + return "test"; + } + }); + ImmutableCacheStats diskStats = getStatsSnapshotForTier(tieredSpilloverCache, TIER_DIMENSION_VALUE_DISK); - for (int i = 0; i < numberOfSameKeys; i++) { - threads[i] = new Thread(() -> { - try { - LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { - boolean isLoaded = false; + assertEquals(0, diskStats.getSizeInBytes()); + assertEquals(1, removalListener.evictionsMetric.count()); + assertEquals("test", value); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } - @Override - public boolean isLoaded() { - return isLoaded; - } + public void testComputeIfAbsentConcurrentlyWithLoaderReturningNull() throws Exception { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + boolean isLoaded = false; - @Override - public String load(ICacheKey key) { - return null; - } - }; - loadAwareCacheLoaderList.add(loadAwareCacheLoader); - phaser.arriveAndAwaitAdvance(); - tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); - } catch (Exception e) { - exceptionCount.incrementAndGet(); - assertEquals(ExecutionException.class, e.getClass()); - assertEquals(NullPointerException.class, e.getCause().getClass()); - assertEquals("Loader returned a null value", e.getCause().getMessage()); - } finally { - countDownLatch.countDown(); - } - }); - threads[i].start(); - } - phaser.arriveAndAwaitAdvance(); - countDownLatch.await(); // Wait for rest of tasks to be cancelled. + @Override + public boolean isLoaded() { + return isLoaded; + } - // Verify exception count was equal to number of requests - assertEquals(numberOfSameKeys, exceptionCount.get()); - assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + @Override + public String load(ICacheKey key) { + return null; + } + }; + verifyComputeIfAbsentThrowsException(NullPointerException.class, loadAwareCacheLoader, "Loader returned a " + "null value"); } public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception { @@ -1731,6 +1758,66 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + private void verifyComputeIfAbsentThrowsException( + Class expectedException, + LoadAwareCacheLoader loader, + String expectedExceptionMessage + ) throws InterruptedException { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + int keyValueSize = 50; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(); + + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + settings, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + String value = UUID.randomUUID().toString(); + AtomicInteger exceptionCount = new AtomicInteger(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loader); + } catch (Exception e) { + exceptionCount.incrementAndGet(); + assertEquals(ExecutionException.class, e.getClass()); + assertEquals(expectedException, e.getCause().getClass()); + assertEquals(expectedExceptionMessage, e.getCause().getMessage()); + } finally { + countDownLatch.countDown(); + } + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + + // Verify exception count was equal to number of requests + assertEquals(numberOfSameKeys, exceptionCount.get()); + assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); + } + private ImmutableCacheStats getTotalStatsSnapshot(TieredSpilloverCache tsc) throws IOException { ImmutableCacheStatsHolder cacheStats = tsc.stats(new String[0]); return cacheStats.getStatsForDimensionValues(List.of()); From 7fb3893764d739855355cefe73c754f366a1b3aa Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 21 Jun 2024 15:26:39 -0700 Subject: [PATCH 08/10] Fixing gradle fail Signed-off-by: Sagar Upadhyaya --- .../cache/common/tier/TieredSpilloverCacheTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 98a4cc8bd3924..39eb3bc7d142c 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -933,6 +933,7 @@ public String load(ICacheKey key) { verifyComputeIfAbsentThrowsException(RuntimeException.class, loadAwareCacheLoader, "Testing"); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Exception { int onHeapCacheSize = randomIntBetween(100, 300); int diskCacheSize = randomIntBetween(200, 400); @@ -999,6 +1000,7 @@ public String load(ICacheKey key) { assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exception { int onHeapCacheSize = 0; int keyValueSize = 50; @@ -1758,6 +1760,7 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void verifyComputeIfAbsentThrowsException( Class expectedException, LoadAwareCacheLoader loader, From c7881dfa76656f75429e39384fbf720fb0201ef5 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 24 Jun 2024 16:44:59 -0700 Subject: [PATCH 09/10] Addressing comments to refactor unit test Signed-off-by: Sagar Upadhyaya --- .../tier/TieredSpilloverCacheTests.java | 157 +++++++++--------- 1 file changed, 76 insertions(+), 81 deletions(-) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 39eb3bc7d142c..14dfdb67f1add 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -952,50 +952,28 @@ public void testComputeIfAbsentWithOnHeapCacheThrowingExceptionOnPut() throws Ex ICache mockOnHeapCache = mock(ICache.class); when(onHeapCacheFactory.create(any(), any(), any())).thenReturn(mockOnHeapCache); doThrow(new RuntimeException("Testing")).when(mockOnHeapCache).put(any(), any()); - CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) - .setKeyType(String.class) - .setWeigher((k, v) -> keyValueSize) - .setSettings(settings) - .setDimensionNames(dimensionNames) - .setRemovalListener(removalListener) - .setKeySerializer(new StringSerializer()) - .setValueSerializer(new StringSerializer()) - .setSettings( - Settings.builder() - .put( - CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), - TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME - ) - .put(FeatureFlags.PLUGGABLE_CACHE, "true") - .put(settings) - .build() - ) - .setClusterSettings(clusterSettings) - .build(); + CacheConfig cacheConfig = getCacheConfig(keyValueSize, settings, removalListener); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize, false); - TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( - CacheType.INDICES_REQUEST_CACHE - ) - .setRemovalListener(removalListener) - .setOnHeapCacheFactory(onHeapCacheFactory) - .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig) - .build(); + TieredSpilloverCache tieredSpilloverCache = getTieredSpilloverCache( + onHeapCacheFactory, + mockDiskCacheFactory, + cacheConfig, + null, + removalListener + ); String value = ""; - try { - value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { - @Override - public boolean isLoaded() { - return false; - } + value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } - @Override - public String load(ICacheKey key) { - return "test"; - } - }); - } catch (Exception ex) {} + @Override + public String load(ICacheKey key) { + return "test"; + } + }); assertEquals("test", value); assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); } @@ -1015,39 +993,20 @@ public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exce ) .build(); ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); - CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) - .setKeyType(String.class) - .setWeigher((k, v) -> keyValueSize) - .setSettings(settings) - .setDimensionNames(dimensionNames) - .setRemovalListener(removalListener) - .setKeySerializer(new StringSerializer()) - .setValueSerializer(new StringSerializer()) - .setSettings( - Settings.builder() - .put( - CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), - TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME - ) - .put(FeatureFlags.PLUGGABLE_CACHE, "true") - .put(settings) - .build() - ) - .setClusterSettings(clusterSettings) - .build(); + CacheConfig cacheConfig = getCacheConfig(keyValueSize, settings, removalListener); ICache.Factory mockDiskCacheFactory = mock(MockDiskCache.MockDiskCacheFactory.class); ICache mockDiskCache = mock(ICache.class); when(mockDiskCacheFactory.create(any(), any(), any())).thenReturn(mockDiskCache); doThrow(new RuntimeException("Test")).when(mockDiskCache).put(any(), any()); - TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder().setCacheType( - CacheType.INDICES_REQUEST_CACHE - ) - .setRemovalListener(removalListener) - .setOnHeapCacheFactory(onHeapCacheFactory) - .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig) - .build(); + TieredSpilloverCache tieredSpilloverCache = getTieredSpilloverCache( + onHeapCacheFactory, + mockDiskCacheFactory, + cacheConfig, + null, + removalListener + ); + String value = ""; value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { @Override @@ -1082,7 +1041,7 @@ public String load(ICacheKey key) { return null; } }; - verifyComputeIfAbsentThrowsException(NullPointerException.class, loadAwareCacheLoader, "Loader returned a " + "null value"); + verifyComputeIfAbsentThrowsException(NullPointerException.class, loadAwareCacheLoader, "Loader returned a null value"); } public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exception { @@ -1667,6 +1626,26 @@ public boolean isLoaded() { }; } + private TieredSpilloverCache getTieredSpilloverCache( + ICache.Factory onHeapCacheFactory, + ICache.Factory mockDiskCacheFactory, + CacheConfig cacheConfig, + List> policies, + RemovalListener, String> removalListener + ) { + TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( + CacheType.INDICES_REQUEST_CACHE + ) + .setRemovalListener(removalListener) + .setOnHeapCacheFactory(onHeapCacheFactory) + .setDiskCacheFactory(mockDiskCacheFactory) + .setCacheConfig(cacheConfig); + if (policies != null) { + builder.addPolicies(policies); + } + return builder.build(); + } + private TieredSpilloverCache initializeTieredSpilloverCache( int keyValueSize, int diskCacheSize, @@ -1709,17 +1688,34 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize, false); - TieredSpilloverCache.Builder builder = new TieredSpilloverCache.Builder().setCacheType( - CacheType.INDICES_REQUEST_CACHE - ) + return getTieredSpilloverCache(onHeapCacheFactory, mockDiskCacheFactory, cacheConfig, policies, removalListener); + } + + private CacheConfig getCacheConfig( + int keyValueSize, + Settings settings, + RemovalListener, String> removalListener + ) { + return new CacheConfig.Builder().setKeyType(String.class) + .setKeyType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setSettings(settings) + .setDimensionNames(dimensionNames) .setRemovalListener(removalListener) - .setOnHeapCacheFactory(onHeapCacheFactory) - .setDiskCacheFactory(mockDiskCacheFactory) - .setCacheConfig(cacheConfig); - if (policies != null) { - builder.addPolicies(policies); - } - return builder.build(); + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setSettings( + Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .put(settings) + .build() + ) + .setClusterSettings(clusterSettings) + .build(); } // Helper functions for extracting tier aggregated stats. @@ -1760,10 +1756,9 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache t return snapshot; } - @SuppressWarnings({ "rawtypes", "unchecked" }) private void verifyComputeIfAbsentThrowsException( Class expectedException, - LoadAwareCacheLoader loader, + LoadAwareCacheLoader, String> loader, String expectedExceptionMessage ) throws InterruptedException { int onHeapCacheSize = randomIntBetween(100, 300); From 3686d2c1201b366f60105025d0d759ce2b241834 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Mon, 24 Jun 2024 18:00:28 -0700 Subject: [PATCH 10/10] minor UT refactor Signed-off-by: Sagar Upadhyaya --- .../cache/common/tier/TieredSpilloverCacheTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 14dfdb67f1add..b9c7bbdb77d3d 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -1007,8 +1007,8 @@ public void testComputeIfAbsentWithDiskCacheThrowingExceptionOnPut() throws Exce removalListener ); - String value = ""; - value = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { + String response = ""; + response = tieredSpilloverCache.computeIfAbsent(getICacheKey("test"), new LoadAwareCacheLoader<>() { @Override public boolean isLoaded() { return false; @@ -1023,7 +1023,7 @@ public String load(ICacheKey key) { assertEquals(0, diskStats.getSizeInBytes()); assertEquals(1, removalListener.evictionsMetric.count()); - assertEquals("test", value); + assertEquals("test", response); assertEquals(0, tieredSpilloverCache.completableFutureMap.size()); }