Skip to content

Commit

Permalink
Added concurrency test, which fails possibly due to outdated framework
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 19, 2023
1 parent 8510637 commit 1601644
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 53 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCac
private final EhcacheEventListener listener;
private final IndicesRequestCache indicesRequestCache; // only used to create new Keys
private final String nodeId;
public int numGets; // debug for concurrency test
// private RBMIntKeyLookupStore keystore;
// private CacheTierPolicy[] policies;
// private IndicesRequestCacheDiskTierPolicy policy;
Expand All @@ -75,6 +76,7 @@ public EhcacheDiskCachingTier(
this.indicesRequestCache = indicesRequestCache;
this.nodeId = nodeId;
this.diskCacheFP = PathUtils.get(BASE_DISK_CACHE_FP, nodeId).toString();
this.numGets = 0; // debug
// I know this function warns it shouldn't often be used, we can fix it to use the roots once we pick a final FP

getManager();
Expand Down Expand Up @@ -148,13 +150,17 @@ public BytesReference get(IndicesRequestCache.Key key) {

// if (keystore.contains(key.hashCode()) {
long now = System.nanoTime();
numGets++;
BytesReference value = null;
try {
value = cache.get(new EhcacheKey(key));
} catch (IOException ignored) { // do smth with this later
} catch (IOException e) { // do smth with this later
System.out.println("Error in get!");
e.printStackTrace();
}
double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000;
getTimeMillisEWMA.addValue(tookTimeMillis);

return value;
// }
// return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
Expand All @@ -34,6 +35,10 @@ public class TieredCacheSpilloverStrategyHandler<K extends Writeable, V> impleme
*/
private final List<CachingTier<K, V>> cachingTierList;

/*public AtomicInteger numGets; // debug only
public AtomicInteger numHeapGets;
public AtomicInteger numHeapHits;
public AtomicInteger numDiskHits;*/
private TieredCacheSpilloverStrategyHandler(
OnHeapCachingTier<K, V> onHeapCachingTier,
DiskCachingTier<K, V> diskCachingTier,
Expand All @@ -43,6 +48,10 @@ private TieredCacheSpilloverStrategyHandler(
this.diskCachingTier = Objects.requireNonNull(diskCachingTier);
this.tieredCacheEventListener = tieredCacheEventListener;
this.cachingTierList = Arrays.asList(onHeapCachingTier, diskCachingTier);
/*this.numGets = new AtomicInteger();
this.numHeapGets = new AtomicInteger();
this.numHeapHits = new AtomicInteger();
this.numDiskHits = new AtomicInteger();*/
setRemovalListeners();
}

Expand Down Expand Up @@ -118,8 +127,8 @@ public CachingTier<K, V> getOnHeapCachingTier() {
return this.onHeapCachingTier;
}

public DiskCachingTier<K, V> getDiskCachingTier() { // change to CachingTier after debug
return this.diskCachingTier;
public EhcacheDiskCachingTier getDiskCachingTier() { // change to CachingTier after debug
return (EhcacheDiskCachingTier) this.diskCachingTier;
}

private void setRemovalListeners() {
Expand All @@ -131,9 +140,22 @@ private void setRemovalListeners() {
private Function<K, CacheValue<V>> getValueFromTierCache() {
return key -> {
for (CachingTier<K, V> cachingTier : cachingTierList) {
// counters are debug only
/*if (cachingTier.getTierType() == TierType.ON_HEAP) {
numHeapGets.incrementAndGet();
} else if (cachingTier.getTierType() == TierType.DISK) {
numGets.incrementAndGet();
}*/

V value = cachingTier.get(key);
if (value != null) {
tieredCacheEventListener.onHit(key, value, cachingTier.getTierType());
/*if (cachingTier.getTierType() == TierType.ON_HEAP) {
numHeapHits.incrementAndGet();
}
if (cachingTier.getTierType() == TierType.DISK) {
numDiskHits.incrementAndGet();
}*/
return new CacheValue<>(value, cachingTier.getTierType());
}
tieredCacheEventListener.onMiss(key, cachingTier.getTierType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.Randomness;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
Expand All @@ -67,8 +68,13 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase {
Expand Down Expand Up @@ -248,6 +254,83 @@ public void testDiskGetTimeEWMA() throws Exception {
cache.closeDiskTier();
}

public void testEhcacheConcurrency() throws Exception {
ShardRequestCache requestCacheStats = new ShardRequestCache();
Settings.Builder settingsBuilder = Settings.builder();
long heapSizeBytes = 0; // skip directly to disk cache
settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes));
IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class));

Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);

TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);

Random rand = Randomness.get();
int minThreads = 6;
int maxThreads = 8;
int numThreads = rand.nextInt(maxThreads - minThreads) + minThreads;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
int numRequests = 50;
int numRepeats = 10;
BytesReference[] termBytesArr = new BytesReference[numRequests];
ArrayList<Integer> permutation = new ArrayList<>();

// Have these threads make 50 requests, with 10 repeats, in random order, and keep track of the keys.
// At the end, make sure that all the keys are in the cache, there are 40 misses, and 10 hits.

for (int i = 0; i < numRequests; i++) {
int searchValue = i;
if (i > numRequests - numRepeats - 1) {
searchValue = i - (numRequests - numRepeats); // repeat values 0-9
}
//System.out.println("values: " + i + " " + searchValue);
permutation.add(searchValue);
if (i == searchValue) {
TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(searchValue));
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
termBytesArr[i] = termBytes;
}
}
java.util.Collections.shuffle(permutation);

ArrayList<Future<BytesReference>> futures = new ArrayList<>();

for (int j = 0; j < permutation.size(); j++) {
int keyNumber = permutation.get(j);
Future<BytesReference> fut = executor.submit(() -> cache.getOrCompute(entity, loader, reader, termBytesArr[keyNumber]));
futures.add(fut);
}

// now go thru and get them all
for (Future<BytesReference> fut : futures) {
BytesReference value = fut.get();
assertNotNull(value);
}

System.out.println("heap size " + cache.tieredCacheHandler.count(TierType.ON_HEAP));
System.out.println("disk size " + cache.tieredCacheHandler.count(TierType.DISK));
System.out.println("disk misses " + requestCacheStats.stats(TierType.DISK).getMissCount());
System.out.println("disk hits " + requestCacheStats.stats(TierType.DISK).getHitCount());
/*System.out.println("disk num gets " + cache.tieredCacheHandler.getDiskCachingTier().numGets);
System.out.println("handler num get " + cache.tieredCacheHandler.numGets.intValue());
System.out.println("handler num heap get " + cache.tieredCacheHandler.numHeapGets.intValue());
System.out.println("handler num heap hit " + cache.tieredCacheHandler.numHeapHits.intValue());
System.out.println("handler num disk hit " + cache.tieredCacheHandler.numDiskHits.intValue());*/

assertEquals(numRequests - numRepeats, cache.tieredCacheHandler.count(TierType.DISK)); // correct
assertEquals(numRequests - numRepeats, requestCacheStats.stats(TierType.DISK).getMissCount()); // usually correctly 40, sometimes 41
assertEquals(numRepeats, requestCacheStats.stats(TierType.DISK).getHitCount()); // should be 10, is usually 9

IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();

}

public void testCacheDifferentReaders() throws Exception {
IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class));
AtomicBoolean indexShard = new AtomicBoolean(true);
Expand Down

0 comments on commit 1601644

Please sign in to comment.