diff --git a/server/src/main/java/org/opensearch/indices/DummySerializableKey.java b/server/src/main/java/org/opensearch/indices/DummySerializableKey.java deleted file mode 100644 index 7f2888f6e65f7..0000000000000 --- a/server/src/main/java/org/opensearch/indices/DummySerializableKey.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.indices; - -import java.io.Serializable; -import java.util.Objects; - -public class DummySerializableKey implements Serializable { - private Integer i; - private String s; - public DummySerializableKey(Integer i, String s) { - this.i = i; - this.s = s; - } - - public int getI() { - return i; - } - public String getS() { - return s; - } - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof DummySerializableKey)) { - return false; - } - DummySerializableKey other = (DummySerializableKey) o; - return Objects.equals(this.i, other.i) && this.s.equals(other.s); - } - @Override - public final int hashCode() { - int result = 11; - if (i != null) { - result = 31 * result + i.hashCode(); - } - if (s != null) { - result = 31 * result + s.hashCode(); - } - return result; - } -} diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 01fe6d491a58a..6ccbb68515b50 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -60,6 +60,7 @@ public class EhcacheDiskCachingTier implements DiskCachingTier impleme */ private final List> cachingTierList; + /*public AtomicInteger numGets; // debug only + public AtomicInteger numHeapGets; + public AtomicInteger numHeapHits; + public AtomicInteger numDiskHits;*/ private TieredCacheSpilloverStrategyHandler( OnHeapCachingTier onHeapCachingTier, DiskCachingTier diskCachingTier, @@ -43,6 +48,10 @@ private TieredCacheSpilloverStrategyHandler( this.diskCachingTier = Objects.requireNonNull(diskCachingTier); this.tieredCacheEventListener = tieredCacheEventListener; this.cachingTierList = Arrays.asList(onHeapCachingTier, diskCachingTier); + /*this.numGets = new AtomicInteger(); + this.numHeapGets = new AtomicInteger(); + this.numHeapHits = new AtomicInteger(); + this.numDiskHits = new AtomicInteger();*/ setRemovalListeners(); } @@ -118,8 +127,8 @@ public CachingTier getOnHeapCachingTier() { return this.onHeapCachingTier; } - public DiskCachingTier getDiskCachingTier() { // change to CachingTier after debug - return this.diskCachingTier; + public EhcacheDiskCachingTier getDiskCachingTier() { // change to CachingTier after debug + return (EhcacheDiskCachingTier) this.diskCachingTier; } private void setRemovalListeners() { @@ -131,9 +140,22 @@ private void setRemovalListeners() { private Function> getValueFromTierCache() { return key -> { for (CachingTier cachingTier : cachingTierList) { + // counters are debug only + /*if (cachingTier.getTierType() == TierType.ON_HEAP) { + numHeapGets.incrementAndGet(); + } else if (cachingTier.getTierType() == TierType.DISK) { + numGets.incrementAndGet(); + }*/ + V value = cachingTier.get(key); if (value != null) { tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + /*if (cachingTier.getTierType() == TierType.ON_HEAP) { + numHeapHits.incrementAndGet(); + } + if (cachingTier.getTierType() == TierType.DISK) { + numDiskHits.incrementAndGet(); + }*/ return new CacheValue<>(value, cachingTier.getTierType()); } tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 2bd2bd53a787a..a68f0795c2e2c 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -46,6 +46,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.Randomness; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; @@ -67,8 +68,13 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Random; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { @@ -248,6 +254,83 @@ public void testDiskGetTimeEWMA() throws Exception { cache.closeDiskTier(); } + public void testEhcacheConcurrency() throws Exception { + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 0; // skip directly to disk cache + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + + Random rand = Randomness.get(); + int minThreads = 6; + int maxThreads = 8; + int numThreads = rand.nextInt(maxThreads - minThreads) + minThreads; + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads); + int numRequests = 50; + int numRepeats = 10; + BytesReference[] termBytesArr = new BytesReference[numRequests]; + ArrayList permutation = new ArrayList<>(); + + // Have these threads make 50 requests, with 10 repeats, in random order, and keep track of the keys. + // At the end, make sure that all the keys are in the cache, there are 40 misses, and 10 hits. + + for (int i = 0; i < numRequests; i++) { + int searchValue = i; + if (i > numRequests - numRepeats - 1) { + searchValue = i - (numRequests - numRepeats); // repeat values 0-9 + } + //System.out.println("values: " + i + " " + searchValue); + permutation.add(searchValue); + if (i == searchValue) { + TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(searchValue)); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + termBytesArr[i] = termBytes; + } + } + java.util.Collections.shuffle(permutation); + + ArrayList> futures = new ArrayList<>(); + + for (int j = 0; j < permutation.size(); j++) { + int keyNumber = permutation.get(j); + Future fut = executor.submit(() -> cache.getOrCompute(entity, loader, reader, termBytesArr[keyNumber])); + futures.add(fut); + } + + // now go thru and get them all + for (Future fut : futures) { + BytesReference value = fut.get(); + assertNotNull(value); + } + + System.out.println("heap size " + cache.tieredCacheHandler.count(TierType.ON_HEAP)); + System.out.println("disk size " + cache.tieredCacheHandler.count(TierType.DISK)); + System.out.println("disk misses " + requestCacheStats.stats(TierType.DISK).getMissCount()); + System.out.println("disk hits " + requestCacheStats.stats(TierType.DISK).getHitCount()); + /*System.out.println("disk num gets " + cache.tieredCacheHandler.getDiskCachingTier().numGets); + System.out.println("handler num get " + cache.tieredCacheHandler.numGets.intValue()); + System.out.println("handler num heap get " + cache.tieredCacheHandler.numHeapGets.intValue()); + System.out.println("handler num heap hit " + cache.tieredCacheHandler.numHeapHits.intValue()); + System.out.println("handler num disk hit " + cache.tieredCacheHandler.numDiskHits.intValue());*/ + + assertEquals(numRequests - numRepeats, cache.tieredCacheHandler.count(TierType.DISK)); // correct + assertEquals(numRequests - numRepeats, requestCacheStats.stats(TierType.DISK).getMissCount()); // usually correctly 40, sometimes 41 + assertEquals(numRepeats, requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 10, is usually 9 + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + + } + public void testCacheDifferentReaders() throws Exception { IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true);