diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java index de88c44cc1e8f..b1b5b03ed13cd 100644 --- a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -13,4 +13,10 @@ public interface DiskCachingTier extends CachingTier { * Closes the disk tier. */ void close(); + + /** + * Get the EWMA time in milliseconds for a get(). + * @return + */ + double getTimeMillisEWMA(); } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 190cfa1b9f7cc..ce0fc52c9a96e 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -9,7 +9,6 @@ package org.opensearch.indices; import org.ehcache.PersistentCacheManager; -import org.ehcache.config.CacheRuntimeConfiguration; import org.ehcache.config.builders.CacheConfigurationBuilder; import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; import org.ehcache.config.builders.CacheManagerBuilder; @@ -36,17 +35,13 @@ public class EhcacheDiskCachingTier implements DiskCachingTier cache; - //private final Class keyType; // These are needed to pass to newCacheConfigurationBuilder - //private final Class> ehcacheKeyType; - //private final Class valueType; - public final static String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value + public final static String DISK_CACHE_FP = "disk_cache_tier"; // Placeholder. this should probably be defined somewhere else, since we need to change security.policy based on its value private RemovalListener removalListener; private ExponentiallyWeightedMovingAverage getTimeMillisEWMA; private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch private static final int MIN_WRITE_THREADS = 0; private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes private static final String cacheAlias = "diskTier"; - private final boolean isPersistent; private CounterMetric count; // number of entries in cache private final EhcacheEventListener listener; private final IndicesRequestCache indicesRequestCache; // only used to create new Keys @@ -55,24 +50,23 @@ public class EhcacheDiskCachingTier implements DiskCachingTier keyType, - //Class> ehcacheKeyType, - //Class valueType ) { - this.isPersistent = isPersistent; - //this.keyType = keyType; - //this.ehcacheKeyType = ehcacheKeyType; - //this.valueType = valueType; this.count = new CounterMetric(); this.listener = new EhcacheEventListener(this, this); this.indicesRequestCache = indicesRequestCache; getManager(); - getOrCreateCache(isPersistent, maxWeightInBytes); + try { + cacheManager.destroyCache(cacheAlias); + } catch (Exception e) { + System.out.println("Unable to destroy cache!!"); + e.printStackTrace(); + // do actual logging later + } + createCache(maxWeightInBytes); this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10); // this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes); @@ -83,19 +77,31 @@ public EhcacheDiskCachingTier( public static void getManager() { // based on https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo // resolving double-initialization issue when using OpenSearchSingleNodeTestCase - if (cacheManager == null) { - PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() - .defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS) - .build(); - - cacheManager = CacheManagerBuilder.newCacheManagerBuilder() - .using(threadConfig) - .with(CacheManagerBuilder.persistence(DISK_CACHE_FP) - ).build(true); + if (cacheManager != null) { + try { + try { + cacheManager.close(); + } catch (IllegalStateException e) { + System.out.println("Cache was uninitialized, skipping close() and moving to destroy()"); + } + cacheManager.destroy(); + } catch (Exception e) { + System.out.println("Was unable to destroy cache manager"); + e.printStackTrace(); + // actual logging later + } } + PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + .defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS) + .build(); + + cacheManager = CacheManagerBuilder.newCacheManagerBuilder() + .using(threadConfig) + .with(CacheManagerBuilder.persistence(DISK_CACHE_FP) + ).build(true); } - private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) { + private void createCache(long maxWeightInBytes) { // our EhcacheEventListener should receive events every time an entry is changed CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder .newEventListenerConfiguration(listener, @@ -107,30 +113,10 @@ private void getOrCreateCache(boolean isPersistent, long maxWeightInBytes) { .ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous() - try { - cache = cacheManager.createCache(cacheAlias, - CacheConfigurationBuilder.newCacheConfigurationBuilder( - EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent)) - .withService(listenerConfig)); - } catch (IllegalArgumentException e) { - // Thrown when the cache already exists, which may happen in test cases - // In this case the listener is configured to send messages to some other disk tier instance, which we don't want - // (it was set up unnecessarily by the test case) - - // change config of existing cache to use this listener rather than the one instantiated by the test case - cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, BytesReference.class); - // cache.getRuntimeConfiguration().cacheConfigurationListenerList contains the old listener, but it's private - // and theres no method to clear it unless you have the actual listener object, so it has to stay i think - - cache.getRuntimeConfiguration().registerCacheEventListener(listener, EventOrdering.ORDERED, EventFiring.ASYNCHRONOUS, - EnumSet.of( - EventType.EVICTED, - EventType.EXPIRED, - EventType.REMOVED, - EventType.UPDATED, - EventType.CREATED)); - int k = 1; - } + cache = cacheManager.createCache(cacheAlias, + CacheConfigurationBuilder.newCacheConfigurationBuilder( + EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, false)) + .withService(listenerConfig)); } @Override @@ -209,7 +195,6 @@ public Iterable keys() { @Override public int count() { - int j = 0; return (int) count.count(); } @@ -230,33 +215,28 @@ public void onRemoval(RemovalNotification e IndicesRequestCache.Key key = tier.convertEhcacheKeyToOriginal(ehcacheKey); removalListener.onRemoval(new RemovalNotification<>(key, oldValue, reason)); } catch (Exception ignored) {} - } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index a38e9d71fb982..16aed8830cf84 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -129,10 +129,8 @@ public final class IndicesRequestCache implements TieredCacheEventListener().setOnHeapCachingTier( openSearchOnHeapCache ).setOnDiskCachingTier(diskCachingTier).setTieredCacheEventListener(this).build(); diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java index 0a70375790191..4816f94f7d619 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheHandler.java @@ -23,4 +23,6 @@ public interface TieredCacheHandler { CachingTier getOnHeapCachingTier(); void closeDiskTier(); + + double diskGetTimeMillisEWMA(); } diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index 6a4aa812cf010..caa4b108946ac 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -147,6 +147,11 @@ public void closeDiskTier() { diskCachingTier.close(); } + @Override + public double diskGetTimeMillisEWMA() { + return diskCachingTier.getTimeMillisEWMA(); + } + public static class CacheValue { V value; TierType source; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 35da7bac938e6..2bd2bd53a787a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -45,19 +45,6 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.ehcache.Cache; -import org.ehcache.PersistentCacheManager; -import org.ehcache.config.builders.CacheConfigurationBuilder; -import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; -import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.config.units.MemoryUnit; -import org.ehcache.core.internal.statistics.DefaultStatisticsService; -import org.ehcache.core.spi.service.StatisticsService; -import org.ehcache.core.statistics.TierStatistics; -import org.ehcache.event.EventType; -import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; import org.opensearch.common.CheckedSupplier; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -80,9 +67,7 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -196,18 +181,17 @@ public void testSpillover() throws Exception { TestEntity entity = new TestEntity(requestCacheStats, indexShard); Loader loader = new Loader(reader, 0); System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes()); - IndicesRequestCache.Key[] keys = new IndicesRequestCache.Key[maxNumInHeap + 1]; + BytesReference[] termBytesArr = new BytesReference[maxNumInHeap + 1]; + for (int i = 0; i < maxNumInHeap + 1; i++) { TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i)); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); - keys[i] = cache.new Key(entity, termBytes, rKey); + termBytesArr[i] = termBytes; BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); - System.out.println("On-heap cache size after " + (i+1) + " queries = " + requestCacheStats.stats().getMemorySizeInBytes()); - System.out.println("Disk cache size after " + (i+1) + " queries = " + requestCacheStats.stats(TierType.DISK).getMemorySizeInBytes()); } - // attempt to get value from disk cache, the first key should have been evicted - BytesReference firstValue = cache.tieredCacheHandler.get(keys[0]); + // get value from disk cache, the first key should have been evicted + BytesReference firstValue = cache.getOrCompute(entity, loader, reader, termBytesArr[0]); assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes()); // TODO: disk weight bytes @@ -219,7 +203,47 @@ public void testSpillover() throws Exception { assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); - // more? + // get a value from heap cache, second key should still be there + BytesReference secondValue = cache.getOrCompute(entity, loader, reader, termBytesArr[1]); + // get the value on disk cache again + BytesReference firstValueAgain = cache.getOrCompute(entity, loader, reader, termBytesArr[0]); + + assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(2, requestCacheStats.stats(TierType.DISK).getHitCount()); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(1, requestCacheStats.stats().getHitCount()); + assertEquals(maxNumInHeap + 3, requestCacheStats.stats().getMissCount()); + assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); + assertEquals(1, cache.tieredCacheHandler.count(TierType.DISK)); + + IOUtils.close(reader, writer, dir, cache); + cache.closeDiskTier(); + } + + public void testDiskGetTimeEWMA() throws Exception { + ShardRequestCache requestCacheStats = new ShardRequestCache(); + Settings.Builder settingsBuilder = Settings.builder(); + long heapSizeBytes = 0; // skip directly to disk cache + settingsBuilder.put("indices.requests.cache.size", new ByteSizeValue(heapSizeBytes)); + IndicesRequestCache cache = new IndicesRequestCache(settingsBuilder.build(), getInstanceFromNode(IndicesService.class)); + + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + AtomicBoolean indexShard = new AtomicBoolean(true); + + TestEntity entity = new TestEntity(requestCacheStats, indexShard); + Loader loader = new Loader(reader, 0); + + for (int i = 0; i < 50; i++) { + TermQueryBuilder termQuery = new TermQueryBuilder("id", String.valueOf(i)); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + // on my machine get time EWMA converges to ~0.025 ms, but it does have an SSD + assertTrue(cache.tieredCacheHandler.diskGetTimeMillisEWMA() > 0); + } + IOUtils.close(reader, writer, dir, cache); cache.closeDiskTier(); }