From 93bfc5fadfcc0d6d820b8b15d97c5e94006052c6 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Thu, 25 Apr 2024 22:09:42 -0700 Subject: [PATCH] [Tiered Caching] Expose a dynamic setting to disable/enable disk cache (#13373) * [Tiered Caching] Expose a dynamic setting to disable/enable disk cache Signed-off-by: Sagar Upadhyaya * Putting tiered cache settings behind feature flag Signed-off-by: Sagar Upadhyaya * Adding a changelog Signed-off-by: Sagar Upadhyaya * Addressing Sorabh's comments Signed-off-by: Sagar Upadhyaya * Putting new setting behind feature flag Signed-off-by: Sagar Upadhyaya --------- Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- CHANGELOG.md | 1 + .../TieredSpilloverCacheIT.java | 115 ++++++++++++++++++ .../common/tier/TieredSpilloverCache.java | 74 +++++++---- .../tier/TieredSpilloverCachePlugin.java | 13 +- .../tier/TieredSpilloverCacheSettings.java | 22 +++- .../tier/TieredSpilloverCachePluginTests.java | 11 +- .../tier/TieredSpilloverCacheTests.java | 115 ++++++++++++++++++ 7 files changed, 323 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51ff76a7dba4a..07121a6799589 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) +- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java index 977a66c53b7e8..cbe16a690c104 100644 --- a/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java +++ b/modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java @@ -425,6 +425,121 @@ public void testWithExplicitCacheClear() throws Exception { }, 1, TimeUnit.SECONDS); } + public void testWithDynamicDiskCacheSetting() throws Exception { + int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk. + internalCluster().startNode( + Settings.builder() + .put(defaultSettings(onHeapCacheSizeInBytes + "b")) + .put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1)) + .build() + ); + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + ) + .get() + ); + // Update took time policy to zero so that all entries are eligible to be cached on disk. + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put( + TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), + new TimeValue(0, TimeUnit.MILLISECONDS) + ) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + int numberOfIndexedItems = randomIntBetween(5, 10); + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator)); + } + ensureSearchable("index"); + refreshAndWaitForReplication(); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + long perQuerySizeInCacheInBytes = -1; + // Step 1: Hit some queries. We will see misses and queries will be cached(onto disk cache) for subsequent + // requests. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + if (perQuerySizeInCacheInBytes == -1) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes(); + } + assertSearchResponse(resp); + } + + RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + + // Step 2: Hit same queries again. We will see hits now. + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); + assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount()); + assertEquals(0, requestCacheStats.getEvictions()); + long lastKnownHitCount = requestCacheStats.getHitCount(); + long lastKnownMissCount = requestCacheStats.getMissCount(); + + // Step 3: Turn off disk cache now. And hit same queries again. We should not see hits now as all queries + // were cached onto disk cache. + updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false) + .build() + ); + assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get()); + + for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) { + SearchResponse resp = client.prepareSearch("index") + .setRequestCache(true) + .setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator)) + .get(); + assertSearchResponse(resp); + } + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); // + // Still shows disk cache entries as explicit clear or invalidation is required to clean up disk cache. + assertEquals(lastKnownMissCount + numberOfIndexedItems, requestCacheStats.getMissCount()); + assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); // No new hits being seen. + lastKnownMissCount = requestCacheStats.getMissCount(); + lastKnownHitCount = requestCacheStats.getHitCount(); + + // Step 4: Invalidate entries via refresh. + // Explicit refresh would invalidate cache entries. + refreshAndWaitForReplication(); + assertBusy(() -> { + // Explicit refresh should clear up cache entries + assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0); + }, 1, TimeUnit.SECONDS); + requestCacheStats = getRequestCacheStats(client, "index"); + assertEquals(0, lastKnownMissCount - requestCacheStats.getMissCount()); + assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); + } + private RequestCacheStats getRequestCacheStats(Client client, String indexName) { return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache(); } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index ae3d9f1dbcf62..34f17df751d7a 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -27,8 +27,9 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -38,6 +39,8 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; + /** * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap * and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full, @@ -67,12 +70,14 @@ public class TieredSpilloverCache implements ICache { /** * Maintains caching tiers in ascending order of cache latency. */ - private final List> cacheList; + private final Map, Boolean> caches; private final List> policies; TieredSpilloverCache(Builder builder) { Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null"); Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); + Objects.requireNonNull(builder.cacheConfig, "cache config can't be null"); + Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null"); this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null"); this.onHeapCache = builder.onHeapCacheFactory.create( @@ -80,7 +85,8 @@ public class TieredSpilloverCache implements ICache { @Override public void onRemoval(RemovalNotification, V> notification) { try (ReleasableLock ignore = writeLock.acquire()) { - if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) + if (caches.get(diskCache) + && SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason()) && evaluatePolicies(notification.getValue())) { diskCache.put(notification.getKey(), notification.getValue()); } else { @@ -103,9 +109,15 @@ && evaluatePolicies(notification.getValue())) { ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); - this.cacheList = Arrays.asList(onHeapCache, diskCache); + Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings()); + LinkedHashMap, Boolean> cacheListMap = new LinkedHashMap<>(); + cacheListMap.put(onHeapCache, true); + cacheListMap.put(diskCache, isDiskCacheEnabled); + this.caches = Collections.synchronizedMap(cacheListMap); this.dimensionNames = builder.cacheConfig.getDimensionNames(); this.policies = builder.policies; // Will never be null; builder initializes it to an empty list + builder.cacheConfig.getClusterSettings() + .addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache); } // Package private for testing @@ -118,6 +130,13 @@ ICache getDiskCache() { return diskCache; } + // Package private for testing. + void enableDisableDiskCache(Boolean isDiskCacheEnabled) { + // When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of + // separate cache/clear API. + this.caches.put(diskCache, isDiskCacheEnabled); + } + @Override public V get(ICacheKey key) { return getValueFromTieredCache().apply(key); @@ -132,7 +151,6 @@ public void put(ICacheKey key, V value) { @Override public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { - V cacheValue = getValueFromTieredCache().apply(key); if (cacheValue == null) { // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. @@ -151,10 +169,10 @@ public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> public void invalidate(ICacheKey key) { // We are trying to invalidate the key from all caches though it would be present in only of them. // Doing this as we don't know where it is located. We could do a get from both and check that, but what will - // also trigger a hit/miss listener event, so ignoring it for now. + // also count hits/misses stats, so ignoring it for now. try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.invalidate(key); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + cacheEntry.getKey().invalidate(key); } } } @@ -162,8 +180,8 @@ public void invalidate(ICacheKey key) { @Override public void invalidateAll() { try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.invalidateAll(); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + cacheEntry.getKey().invalidateAll(); } } } @@ -175,15 +193,21 @@ public void invalidateAll() { @SuppressWarnings({ "unchecked" }) @Override public Iterable> keys() { - Iterable>[] iterables = (Iterable>[]) new Iterable[] { onHeapCache.keys(), diskCache.keys() }; - return new ConcatenatedIterables>(iterables); + List>> iterableList = new ArrayList<>(); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + iterableList.add(cacheEntry.getKey().keys()); + } + Iterable>[] iterables = (Iterable>[]) iterableList.toArray(new Iterable[0]); + return new ConcatenatedIterables<>(iterables); } @Override public long count() { long count = 0; - for (ICache cache : cacheList) { - count += cache.count(); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + // Count for all the tiers irrespective of whether they are enabled or not. As eventually + // this will turn to zero once cache is cleared up either via invalidation or manually. + count += cacheEntry.getKey().count(); } return count; } @@ -191,16 +215,17 @@ public long count() { @Override public void refresh() { try (ReleasableLock ignore = writeLock.acquire()) { - for (ICache cache : cacheList) { - cache.refresh(); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + cacheEntry.getKey().refresh(); } } } @Override public void close() throws IOException { - for (ICache cache : cacheList) { - cache.close(); + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + // Close all the caches here irrespective of whether they are enabled or not. + cacheEntry.getKey().close(); } } @@ -212,13 +237,12 @@ public ImmutableCacheStatsHolder stats() { private Function, V> getValueFromTieredCache() { return key -> { try (ReleasableLock ignore = readLock.acquire()) { - for (ICache cache : cacheList) { - V value = cache.get(key); - if (value != null) { - // update hit stats - return value; - } else { - // update miss stats + for (Map.Entry, Boolean> cacheEntry : caches.entrySet()) { + if (cacheEntry.getValue()) { + V value = cacheEntry.getKey().get(key); + if (value != null) { + return value; + } } } } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index dfd40199d859e..1c10e51630460 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -11,6 +11,8 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.plugins.CachePlugin; import org.opensearch.plugins.Plugin; @@ -18,6 +20,7 @@ import java.util.List; import java.util.Map; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** @@ -30,10 +33,15 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin { */ public static final String TIERED_CACHE_SPILLOVER_PLUGIN_NAME = "tieredSpilloverCachePlugin"; + private final Settings settings; + /** * Default constructor + * @param settings settings */ - public TieredSpilloverCachePlugin() {} + public TieredSpilloverCachePlugin(Settings settings) { + this.settings = settings; + } @Override public Map getCacheFactoryMap() { @@ -54,6 +62,9 @@ public List> getSettings() { TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)); + if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) { + settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType)); + } } return settingList; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java index b89e8c517a351..e8e441d6bd3a6 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java @@ -42,6 +42,14 @@ public class TieredSpilloverCacheSettings { (key) -> Setting.simpleString(key, "", NodeScope) ); + /** + * Setting to disable/enable disk cache dynamically. + */ + public static final Setting.AffixSetting TIERED_SPILLOVER_DISK_CACHE_SETTING = Setting.suffixKeySetting( + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.enabled", + (key) -> Setting.boolSetting(key, true, NodeScope, Setting.Property.Dynamic) + ); + /** * Setting defining the minimum took time for a query to be allowed into the disk cache. */ @@ -63,17 +71,29 @@ public class TieredSpilloverCacheSettings { public static final Map> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; /** - * Fetches concrete took time policy settings. + * Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and + * retrieved accordingly. + */ + public static final Map> DISK_CACHE_ENABLED_SETTING_MAP; + + /** + * Fetches concrete took time policy and disk cache settings. */ static { Map> concreteTookTimePolicySettingMap = new HashMap<>(); + Map> diskCacheSettingMap = new HashMap<>(); for (CacheType cacheType : CacheType.values()) { concreteTookTimePolicySettingMap.put( cacheType, TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) ); + diskCacheSettingMap.put( + cacheType, + TIERED_SPILLOVER_DISK_CACHE_SETTING.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) + ); } TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap; + DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap; } /** diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java index 1172a48e97c6a..4a96ffe2069ec 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCachePluginTests.java @@ -9,6 +9,8 @@ package org.opensearch.cache.common.tier; import org.opensearch.common.cache.ICache; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.test.OpenSearchTestCase; import java.util.Map; @@ -16,9 +18,16 @@ public class TieredSpilloverCachePluginTests extends OpenSearchTestCase { public void testGetCacheFactoryMap() { - TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin(); + TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin(Settings.EMPTY); Map map = tieredSpilloverCachePlugin.getCacheFactoryMap(); assertNotNull(map.get(TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME)); assertEquals(TieredSpilloverCachePlugin.TIERED_CACHE_SPILLOVER_PLUGIN_NAME, tieredSpilloverCachePlugin.getName()); } + + public void testGetSettingsWithFeatureFlagOn() { + TieredSpilloverCachePlugin tieredSpilloverCachePlugin = new TieredSpilloverCachePlugin( + Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE_SETTING.getKey(), true).build() + ); + assertFalse(tieredSpilloverCachePlugin.getSettings().isEmpty()); + } } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index bf9f8fd22d793..1ecb63414dc68 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -42,6 +42,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -56,6 +57,7 @@ public void setup() { Settings settings = Settings.EMPTY; clusterSettings = new ClusterSettings(settings, new HashSet<>()); clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)); + clusterSettings.registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE)); } public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { @@ -302,6 +304,7 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { ) .build() ) + .setClusterSettings(clusterSettings) .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(0, diskCacheSize); @@ -777,6 +780,7 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio ) .build() ) + .setClusterSettings(clusterSettings) .setDimensionNames(dimensionNames) .build(); TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() @@ -1008,6 +1012,116 @@ public void testMinimumThresholdSettingValue() throws Exception { assertEquals(validDuration, concreteSetting.get(validSettings)); } + public void testPutWithDiskCacheDisabledSetting() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int keyValueSize = 50; + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .put(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false) + .build(), + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); // Create more items than onHeap cache. + for (int iter = 0; iter < numOfItems1; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + ICache diskCache = tieredSpilloverCache.getDiskCache(); + assertEquals(onHeapCacheSize, onHeapCache.count()); + assertEquals(0, diskCache.count()); // Disk cache shouldn't have anything considering it is disabled. + assertEquals(numOfItems1 - onHeapCacheSize, removalListener.evictionsMetric.count()); + } + + public void testGetPutAndInvalidateWithDiskCacheDisabled() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int keyValueSize = 50; + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheRemovalListener removalListener = new MockCacheRemovalListener<>(); + TieredSpilloverCache tieredSpilloverCache = initializeTieredSpilloverCache( + keyValueSize, + diskCacheSize, + removalListener, + Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + onHeapCacheSize * keyValueSize + "b" + ) + .build(), + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize - 1); // Create more items than onHeap + // cache to cause spillover. + for (int iter = 0; iter < numOfItems1; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + ICache onHeapCache = tieredSpilloverCache.getOnHeapCache(); + ICache diskCache = tieredSpilloverCache.getDiskCache(); + List> diskCacheKeys = new ArrayList<>(); + tieredSpilloverCache.getDiskCache().keys().forEach(diskCacheKeys::add); + long actualDiskCacheCount = diskCache.count(); + long actualTieredCacheCount = tieredSpilloverCache.count(); + assertEquals(onHeapCacheSize, onHeapCache.count()); + assertEquals(numOfItems1 - onHeapCacheSize, actualDiskCacheCount); + assertEquals(0, removalListener.evictionsMetric.count()); + assertEquals(numOfItems1, actualTieredCacheCount); + for (ICacheKey diskKey : diskCacheKeys) { + assertNotNull(tieredSpilloverCache.get(diskKey)); + } + + tieredSpilloverCache.enableDisableDiskCache(false); // Disable disk cache now. + int numOfItems2 = totalSize - numOfItems1; + for (int iter = 0; iter < numOfItems2; iter++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + LoadAwareCacheLoader, String> loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + for (ICacheKey diskKey : diskCacheKeys) { + assertNull(tieredSpilloverCache.get(diskKey)); // Considering disk cache is disabled, we shouldn't find + // these keys. + } + assertEquals(onHeapCacheSize, onHeapCache.count()); // Should remain same. + assertEquals(0, diskCache.count() - actualDiskCacheCount); // Considering it is disabled now, shouldn't cache + // any more items. + assertEquals(numOfItems2, removalListener.evictionsMetric.count()); // Considering onHeap cache was already + // full, we should all existing onHeap entries being evicted. + assertEquals(0, tieredSpilloverCache.count() - actualTieredCacheCount); // Count still returns disk cache + // entries count as they haven't been cleared yet. + long lastKnownTieredCacheEntriesCount = tieredSpilloverCache.count(); + + // Clear up disk cache keys. + for (ICacheKey diskKey : diskCacheKeys) { + tieredSpilloverCache.invalidate(diskKey); + } + assertEquals(0, diskCache.count()); + assertEquals(lastKnownTieredCacheEntriesCount - diskCacheKeys.size(), tieredSpilloverCache.count()); + + tieredSpilloverCache.invalidateAll(); // Clear up all the keys. + assertEquals(0, tieredSpilloverCache.count()); + } + private List getMockDimensions() { List dims = new ArrayList<>(); for (String dimensionName : dimensionNames) { @@ -1121,6 +1235,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .put(settings) .build() ) + .setClusterSettings(clusterSettings) .build(); ICache.Factory mockDiskCacheFactory = new MockDiskCache.MockDiskCacheFactory(diskDeliberateDelay, diskCacheSize);