Skip to content

Commit

Permalink
Add IT tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranprakash154 committed Dec 5, 2023
1 parent f22cdaf commit 280cb07
Showing 1 changed file with 253 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.time.Duration;
import java.time.Instant;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// This is a separate file from IndicesRequestCacheIT because we only want to run our test
// on a node with a maximum request cache size that we set.

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase {
public void testDiskTierStats() throws Exception {
Expand Down Expand Up @@ -110,7 +112,257 @@ public void testDiskTierStats() throws Exception {
IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 3, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 2, numRequests + 1, TierType.DISK, false);
assertDiskTierSpecificStats(client, "index", 2, tookTimeSoFar, tookTimeSoFar);
}

public void testDiskTierInvalidationByCleanCacheAPI() throws Exception {
int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
for (int i = 1; i < numRequests; i++) {
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

long entries = requestCacheStats.getEntries(TierType.DISK);
// make sure we have 2 entries in disk.
assertEquals(2, entries);

// call clear cache api
client.admin().indices().prepareClearCache().setIndices("index").setRequestCache(true).get();
// fetch the stats again
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
entries = requestCacheStats.getEntries(TierType.DISK);
// make sure we have 0 entries in disk.
assertEquals(0, entries);
}

// When entire disk tier is stale, test whether cache cleaner cleans up everything from disk
public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Instant start = Instant.now();
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

// make sure we have 2 entries in disk.
long entries = requestCacheStats.getEntries(TierType.DISK);
assertEquals(2, entries);

// index a doc and force refresh so that the cache cleaner can clean the cache
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 1_000;
Thread.sleep(sleepTime);

// by now cache cleaner would have run and cleaned up stale keys
// fetch the stats again
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

// make sure we have 0 entries in disk.
entries = requestCacheStats.getEntries(TierType.DISK);
assertEquals(0, entries);
}

// When part of disk tier is stale, test whether cache cleaner cleans up only stale items from disk
public void testDiskTierInvalidationByCacheCleanerPartOfDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Instant start = Instant.now();
int heapSizeBytes = 987;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(thresholdInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "1%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=text").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
SearchResponse resp;

resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);

int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}

RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

// make sure we have 2 entries in disk.
long entries = requestCacheStats.getEntries(TierType.DISK);
assertEquals(2, entries);

// force refresh so that it creates stale keys in the cache for the cache cleaner to pick up.
flushAndRefresh("index");
client().prepareIndex("index").setId("1").setSource("k", "good bye");
ensureSearchable("index");

for (int i = 0; i < 6; i++) { // index 5 items with the new readerCacheKeyId
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
}

// fetch the stats again
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

entries = requestCacheStats.getEntries(TierType.DISK);

// sleep for the threshold time, so that the cache cleaner can clean the cache
Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 5_000;
Thread.sleep(sleepTime);

// fetch the stats again
requestCacheStats = client.admin()
.indices()
.prepareStats("index")
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();

// make sure we have 5 entries in disk.
entries = requestCacheStats.getEntries(TierType.DISK);
assertEquals(5, entries);
}

private long getCacheSizeBytes(Client client, String index, TierType tierType) {
Expand Down

0 comments on commit 280cb07

Please sign in to comment.