Skip to content

Commit

Permalink
Removed persistence, cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 17, 2023
1 parent 2d200fc commit 8f65db4
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ public interface DiskCachingTier<K, V> extends CachingTier<K, V> {
* Closes the disk tier.
*/
void close();

/**
* Get the EWMA time in milliseconds for a get().
* @return
*/
double getTimeMillisEWMA();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices;

import org.ehcache.PersistentCacheManager;
import org.ehcache.config.CacheRuntimeConfiguration;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
Expand All @@ -36,17 +35,13 @@ public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCac

public static PersistentCacheManager cacheManager;
private Cache<EhcacheKey, BytesReference> cache;
//private final Class<K> keyType; // These are needed to pass to newCacheConfigurationBuilder
//private final Class<EhcacheKey<K>> ehcacheKeyType;
//private final Class<V> valueType;
public final static String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value
public final static String DISK_CACHE_FP = "disk_cache_tier"; // Placeholder. this should probably be defined somewhere else, since we need to change security.policy based on its value
private RemovalListener<IndicesRequestCache.Key, BytesReference> removalListener;
private ExponentiallyWeightedMovingAverage getTimeMillisEWMA;
private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch
private static final int MIN_WRITE_THREADS = 0;
private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes
private static final String cacheAlias = "diskTier";
private final boolean isPersistent;
private CounterMetric count; // number of entries in cache
private final EhcacheEventListener listener;
private final IndicesRequestCache indicesRequestCache; // only used to create new Keys
Expand All @@ -55,24 +50,23 @@ public class EhcacheDiskCachingTier implements DiskCachingTier<IndicesRequestCac
// private IndicesRequestCacheDiskTierPolicy policy;

public EhcacheDiskCachingTier(
boolean isPersistent,
long maxWeightInBytes,
long maxKeystoreWeightInBytes,
IndicesRequestCache indicesRequestCache
//Class<K> keyType,
//Class<EhcacheKey<K>> ehcacheKeyType,
//Class<V> valueType
) {
this.isPersistent = isPersistent;
//this.keyType = keyType;
//this.ehcacheKeyType = ehcacheKeyType;
//this.valueType = valueType;
this.count = new CounterMetric();
this.listener = new EhcacheEventListener(this, this);
this.indicesRequestCache = indicesRequestCache;

getManager();
getOrCreateCache(isPersistent, maxWeightInBytes);
try {
cacheManager.destroyCache(cacheAlias);
} catch (Exception e) {
System.out.println("Unable to destroy cache!!");
e.printStackTrace();
// do actual logging later
}
createCache(maxWeightInBytes);
this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10);

// this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes);
Expand All @@ -83,19 +77,31 @@ public EhcacheDiskCachingTier(
public static void getManager() {
// based on https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo
// resolving double-initialization issue when using OpenSearchSingleNodeTestCase
if (cacheManager == null) {
PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS)
.build();

cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.using(threadConfig)
.with(CacheManagerBuilder.persistence(DISK_CACHE_FP)
).build(true);
if (cacheManager != null) {
try {
try {
cacheManager.close();
} catch (IllegalStateException e) {
System.out.println("Cache was uninitialized, skipping close() and moving to destroy()");
}
cacheManager.destroy();
} catch (Exception e) {
System.out.println("Was unable to destroy cache manager");
e.printStackTrace();
// actual logging later
}
}
PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
.defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS)
.build();

cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
.using(threadConfig)
.with(CacheManagerBuilder.persistence(DISK_CACHE_FP)
).build(true);
}

private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) {
private void createCache(long maxWeightInBytes) {
// our EhcacheEventListener should receive events every time an entry is changed
CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder
.newEventListenerConfiguration(listener,
Expand All @@ -107,30 +113,10 @@ private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) {
.ordered().asynchronous();
// ordered() has some performance penalty as compared to unordered(), we can also use synchronous()

try {
cache = cacheManager.createCache(cacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent))
.withService(listenerConfig));
} catch (IllegalArgumentException e) {
// Thrown when the cache already exists, which may happen in test cases
// In this case the listener is configured to send messages to some other disk tier instance, which we don't want
// (it was set up unnecessarily by the test case)

// change config of existing cache to use this listener rather than the one instantiated by the test case
cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, BytesReference.class);
// cache.getRuntimeConfiguration().cacheConfigurationListenerList contains the old listener, but it's private
// and theres no method to clear it unless you have the actual listener object, so it has to stay i think

cache.getRuntimeConfiguration().registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS,
EnumSet.of(
EventType.EVICTED,
EventType.EXPIRED,
EventType.REMOVED,
EventType.UPDATED,
EventType.CREATED));
int k = 1;
}
cache = cacheManager.createCache(cacheAlias,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, false))
.withService(listenerConfig));
}

@Override
Expand Down Expand Up @@ -209,7 +195,6 @@ public Iterable<IndicesRequestCache.Key> keys() {

@Override
public int count() {
int j = 0;
return (int) count.count();
}

Expand All @@ -230,40 +215,36 @@ public void onRemoval(RemovalNotification<IndicesRequestCache.Key, BytesReferenc
removalListener.onRemoval(notification);
}

@Override
public double getTimeMillisEWMA() {
return getTimeMillisEWMA.getAverage();
}

public boolean isPersistent() {
return isPersistent;
}

public IndicesRequestCache.Key convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException {
BytesStreamInput is = new BytesStreamInput();
byte[] bytes = eKey.getBytes();
is.readBytes(bytes, 0, bytes.length);
// we somehow have to use the Reader thing in the Writeable interface
// otherwise its not generic
try {
return indicesRequestCache.new Key(is);
} catch (Exception e) {
System.out.println("Was unable to reconstruct EhcacheKey into Key");
e.printStackTrace();
// actual logging later
}
return null;
}

@Override
public void close() {
// Call this method after each test, otherwise the directory will stay locked and you won't be able to
// initialize another IndicesRequestCache (for example in the next test that runs)
// Should be called after each test
cacheManager.removeCache(cacheAlias);
cacheManager.close();
}

public void destroy() throws Exception {
// Close the cache and delete any persistent data associated with it
// Might also be appropriate after standalone tests
cacheManager.close();
cacheManager.destroy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,5 @@ public void onEvent(CacheEvent<? extends EhcacheKey, ? extends BytesReference> e
IndicesRequestCache.Key key = tier.convertEhcacheKeyToOriginal(ehcacheKey);
removalListener.onRemoval(new RemovalNotification<>(key, oldValue, reason));
} catch (Exception ignored) {}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ public final class IndicesRequestCache implements TieredCacheEventListener<Indic
).setMaximumWeight(sizeInBytes).setExpireAfterAccess(expire).build();

int diskTierWeight = 100 * 1048576; // 100 MB, for testing only

// changed to Integer for testing of bulk writes
EhcacheDiskCachingTier diskCachingTier;
diskCachingTier = new EhcacheDiskCachingTier(false, diskTierWeight, 0, this); // making non-persistent for now
diskCachingTier = new EhcacheDiskCachingTier(diskTierWeight, 0, this);
tieredCacheHandler = new TieredCacheSpilloverStrategyHandler.Builder<Key, BytesReference>().setOnHeapCachingTier(
openSearchOnHeapCache
).setOnDiskCachingTier(diskCachingTier).setTieredCacheEventListener(this).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public interface TieredCacheHandler<K, V> {
CachingTier<K, V> getOnHeapCachingTier();

void closeDiskTier();

double diskGetTimeMillisEWMA();
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public void closeDiskTier() {
diskCachingTier.close();
}

@Override
public double diskGetTimeMillisEWMA() {
return diskCachingTier.getTimeMillisEWMA();
}

public static class CacheValue<V> {
V value;
TierType source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,6 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.ehcache.Cache;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.internal.statistics.DefaultStatisticsService;
import org.ehcache.core.spi.service.StatisticsService;
import org.ehcache.core.statistics.TierStatistics;
import org.ehcache.event.EventType;
import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
Expand All @@ -80,9 +67,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -196,18 +181,17 @@ public void testSpillover() throws Exception {
TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);
System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes());
IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[maxNumInHeap + 1];
BytesReference[] termBytesArr = new BytesReference[maxNumInHeap + 1];

for (int i = 0; i < maxNumInHeap + 1; i++) {
TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i));
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString();
keys[i] = cache.new Key(entity, termBytes, rKey);
termBytesArr[i] = termBytes;
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
System.out.println("On-heap cache size after " + (i+1) + " queries = " + requestCacheStats.stats().getMemorySizeInBytes());
System.out.println("Disk cache size after " + (i+1) + " queries = " + requestCacheStats.stats(TierType.DISK).getMemorySizeInBytes());
}
// attempt to get value from disk cache, the first key should have been evicted
BytesReference firstValue = cache.tieredCacheHandler.get(keys[0]);
// get value from disk cache, the first key should have been evicted
BytesReference firstValue = cache.getOrCompute(entity, loader, reader, termBytesArr[0]);

assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes());
// TODO: disk weight bytes
Expand All @@ -219,7 +203,47 @@ public void testSpillover() throws Exception {
assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP));
assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK));

// more?
// get a value from heap cache, second key should still be there
BytesReference secondValue = cache.getOrCompute(entity, loader, reader, termBytesArr[1]);
// get the value on disk cache again
BytesReference firstValueAgain = cache.getOrCompute(entity, loader, reader, termBytesArr[0]);

assertEquals(1, requestCacheStats.stats().getEvictions());
assertEquals(2, requestCacheStats.stats(TierType.DISK).getHitCount());
assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount());
assertEquals(1, requestCacheStats.stats().getHitCount());
assertEquals(maxNumInHeap + 3, requestCacheStats.stats().getMissCount());
assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP));
assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK));

IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}

public void testDiskGetTimeEWMA() throws Exception {
ShardRequestCache requestCacheStats = new ShardRequestCache();
Settings.Builder settingsBuilder = Settings.builder();
long heapSizeBytes = 0; // skip directly to disk cache
settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes));
IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class));

Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1));
AtomicBoolean indexShard = new AtomicBoolean(true);

TestEntity entity = new TestEntity(requestCacheStats, indexShard);
Loader loader = new Loader(reader, 0);

for (int i = 0; i < 50; i++) {
TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i));
BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false);
BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes);
// on my machine get time EWMA converges to ~0.025 ms, but it does have an SSD
assertTrue(cache.tieredCacheHandler.diskGetTimeMillisEWMA() > 0);
}

IOUtils.close(reader, writer, dir, cache);
cache.closeDiskTier();
}
Expand Down

0 comments on commit 8f65db4

Please sign in to comment.