Skip to content

Commit

Permalink
[Backport 2.x] [Tiered Caching] Segmented cache changes (#16229)
Browse files Browse the repository at this point in the history
* [Tiered Caching] Segmented cache changes  (#16047)

* Segmented cache changes for TieredCache

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Adding change log

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Allow segment number to be power of two

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Moving common tiered cache IT methods to a common base class

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Adding disk took time IT test with multiple segment

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Correcting changelog

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Addressing comments

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing invalid segment count variable name

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Introducing new settings for size for respective cache tier

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Changing the default segmentCount logic

Signed-off-by: Sagar Upadhyaya <[email protected]>

* Fixing missing java doc issue

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>

* Fixing compatilbility test

Signed-off-by: Sagar Upadhyaya <[email protected]>

---------

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: Sagar <[email protected]>
  • Loading branch information
sgup432 authored Oct 8, 2024
1 parent 2585a23 commit 86212cc
Show file tree
Hide file tree
Showing 23 changed files with 2,170 additions and 665 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

public class TieredSpilloverCacheBaseIT extends OpenSearchIntegTestCase {

public Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage, int numberOfSegments) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
numberOfSegments
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public int getNumberOfSegments() {
return randomFrom(1, 2, 4, 8, 16, 64, 128, 256);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.IndicesRequestCache;
Expand All @@ -43,13 +39,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -58,43 +56,15 @@
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
public class TieredSpilloverCacheIT extends TieredSpilloverCacheBaseIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public void testPluginsAreInstalled() {
internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings("1%", getNumberOfSegments())).build());
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Expand All @@ -111,7 +81,8 @@ public void testPluginsAreInstalled() {
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build());
int numberOfSegments = getNumberOfSegments();
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -147,9 +118,97 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
// just a bit higher so that each segment can atleast hold 1 entry.
int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = numberOfSegments + 1; // Best case if all keys are distributed among different
// segment, atleast one of the segment will have 2 entries and we will see evictions.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
// Considering disk cache won't be used due to took time policy having a high value, we expect overall cache
// size to be less than or equal to onHeapCache size.
assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes);
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
// We should atleast one eviction considering disk cache isn't able to hold anything due to policy.
assertTrue(requestCacheStats.getEvictions() > 0);
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
for (int iterator = 0; iterator < numberOfIndexedItems * 2; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery(UUID.randomUUID().toString(), UUID.randomUUID().toString()))
.get();
assertSearchResponse(resp);
}

requestCacheStats = getRequestCacheStats(client, "index");
// We shouldn't see any new evictions now.
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
}

public void testWithDynamicTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -271,9 +330,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {

public void testInvalidationWithIndicesRequestCache() throws Exception {
int onHeapCacheSizeInBytes = 2000;
int numberOfSegments = getNumberOfSegments();
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -354,10 +414,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
}

public void testWithExplicitCacheClear() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -426,10 +487,13 @@ public void testWithExplicitCacheClear() throws Exception {
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -540,6 +604,27 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
}

public void testWithInvalidSegmentNumberSetting() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
assertThrows(
String.format(
Locale.ROOT,
INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE,
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
),
IllegalArgumentException.class,
() -> internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b", 300))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
)
);
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
Expand All @@ -550,7 +635,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
}

@Override
Expand Down
Loading

0 comments on commit 86212cc

Please sign in to comment.