diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 7b64a7e93fe27..e8a3e7985703c 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -11,9 +11,11 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -44,7 +46,14 @@ public class TieredSpilloverCache implements ICache { private final ICache diskCache; private final ICache onHeapCache; - private final RemovalListener removalListener; + + // TODO: Listeners for removals from the two tiers + // private final RemovalListener, V> onDiskRemovalListener; + // private final RemovalListener, V> onHeapRemovalListener; + + // The listener for removals from the spillover cache as a whole + private final RemovalListener, V> removalListener; + private final CacheStats stats; ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); @@ -58,27 +67,27 @@ public class TieredSpilloverCache implements ICache { Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null"); this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null"); - this.onHeapCache = builder.onHeapCacheFactory.create( - new CacheConfig.Builder().setRemovalListener(new RemovalListener() { - @Override - public void onRemoval(RemovalNotification notification) { - try (ReleasableLock ignore = writeLock.acquire()) { - diskCache.put(notification.getKey(), notification.getValue()); - } - removalListener.onRemoval(notification); + this.onHeapCache = builder.onHeapCacheFactory.create(new CacheConfig.Builder().setRemovalListener(new RemovalListener<>() { + @Override + public void onRemoval(RemovalNotification, V> notification) { + try (ReleasableLock ignore = writeLock.acquire()) { + diskCache.put(notification.getKey(), notification.getValue()); } - }) - .setKeyType(builder.cacheConfig.getKeyType()) - .setValueType(builder.cacheConfig.getValueType()) - .setSettings(builder.cacheConfig.getSettings()) - .setWeigher(builder.cacheConfig.getWeigher()) - .build(), + removalListener.onRemoval(notification); + } + }) + .setKeyType(builder.cacheConfig.getKeyType()) + .setValueType(builder.cacheConfig.getValueType()) + .setSettings(builder.cacheConfig.getSettings()) + .setWeigher(builder.cacheConfig.getWeigher()) + .build(), builder.cacheType, builder.cacheFactories ); this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories); this.cacheList = Arrays.asList(onHeapCache, diskCache); + this.stats = null; // TODO } // Package private for testing @@ -92,20 +101,19 @@ ICache getDiskCache() { } @Override - public V get(K key) { + public V get(ICacheKey key) { return getValueFromTieredCache().apply(key); } @Override - public void put(K key, V value) { + public void put(ICacheKey key, V value) { try (ReleasableLock ignore = writeLock.acquire()) { onHeapCache.put(key, value); } } @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { - + public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { V cacheValue = getValueFromTieredCache().apply(key); if (cacheValue == null) { // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. @@ -121,7 +129,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except } @Override - public void invalidate(K key) { + public void invalidate(ICacheKey key) { // We are trying to invalidate the key from all caches though it would be present in only of them. // Doing this as we don't know where it is located. We could do a get from both and check that, but what will // also trigger a hit/miss listener event, so ignoring it for now. @@ -147,7 +155,7 @@ public void invalidateAll() { */ @SuppressWarnings("unchecked") @Override - public Iterable keys() { + public Iterable> keys() { return Iterables.concat(onHeapCache.keys(), diskCache.keys()); } @@ -176,7 +184,11 @@ public void close() throws IOException { } } - private Function getValueFromTieredCache() { + public CacheStats stats() { + return stats; + } + + private Function, V> getValueFromTieredCache() { return key -> { try (ReleasableLock ignore = readLock.acquire()) { for (ICache cache : cacheList) { @@ -254,7 +266,7 @@ public String getCacheName() { public static class Builder { private ICache.Factory onHeapCacheFactory; private ICache.Factory diskCacheFactory; - private RemovalListener removalListener; + private RemovalListener, V> removalListener; private CacheConfig cacheConfig; private CacheType cacheType; private Map cacheFactories; @@ -289,7 +301,7 @@ public Builder setDiskCacheFactory(ICache.Factory diskCacheFactory) { * @param removalListener Removal listener * @return builder */ - public Builder setRemovalListener(RemovalListener removalListener) { + public Builder setRemovalListener(RemovalListener, V> removalListener) { this.removalListener = removalListener; return this; } diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java index 19abf8ae63c28..6b0620c5fbede 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java @@ -31,7 +31,7 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin { /** * Default constructor */ - TieredSpilloverCachePlugin() {} + public TieredSpilloverCachePlugin() {} @Override public Map getCacheFactoryMap() { diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java index 87851ca69dcae..e1a48ec6fdae9 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java @@ -10,9 +10,10 @@ import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.LoadAwareCacheLoader; -import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; @@ -23,22 +24,14 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; public class TieredSpilloverCacheTests extends OpenSearchTestCase { - - public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + // TODO: These tests are uncommented in the second stats rework PR, which adds a TSC stats implementation + /*public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { int onHeapCacheSize = randomIntBetween(10, 30); int keyValueSize = 50; @@ -803,7 +796,7 @@ public String load(String key) { }); thread.start(); assertBusy(() -> { assertTrue(loadAwareCacheLoader.isLoaded()); }, 100, TimeUnit.MILLISECONDS); // We wait for new key to be loaded - // after which it eviction flow is + // after which it eviction flow is // guaranteed to occur. ICache onDiskCache = tieredSpilloverCache.getDiskCache(); @@ -879,7 +872,7 @@ private TieredSpilloverCache intializeTieredSpilloverCache( .setDiskCacheFactory(mockDiskCacheFactory) .setCacheConfig(cacheConfig) .build(); - } + }*/ } /** @@ -896,7 +889,7 @@ public OpenSearchOnHeapCacheWrapper(Builder builder) { } @Override - public V get(K key) { + public V get(ICacheKey key) { V value = super.get(key); if (value != null) { statsHolder.hitCount.inc(); @@ -907,13 +900,13 @@ public V get(K key) { } @Override - public void put(K key, V value) { + public void put(ICacheKey key, V value) { super.put(key, value); statsHolder.onCachedMetric.inc(); } @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { V value = super.computeIfAbsent(key, loader); if (loader.isLoaded()) { statsHolder.missCount.inc(); @@ -925,7 +918,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except } @Override - public void invalidate(K key) { + public void invalidate(ICacheKey key) { super.invalidate(key); } @@ -935,7 +928,7 @@ public void invalidateAll() { } @Override - public Iterable keys() { + public Iterable> keys() { return super.keys(); } @@ -953,7 +946,7 @@ public void refresh() { public void close() {} @Override - public void onRemoval(RemovalNotification notification) { + public void onRemoval(RemovalNotification, V> notification) { super.onRemoval(notification); } @@ -989,24 +982,25 @@ class StatsHolder { class MockOnDiskCache implements ICache { - Map cache; + Map, V> cache; int maxSize; long delay; + CacheStats stats = null; // TODO MockOnDiskCache(int maxSize, long delay) { this.maxSize = maxSize; this.delay = delay; - this.cache = new ConcurrentHashMap(); + this.cache = new ConcurrentHashMap, V>(); } @Override - public V get(K key) { + public V get(ICacheKey key) { V value = cache.get(key); return value; } @Override - public void put(K key, V value) { + public void put(ICacheKey key, V value) { if (this.cache.size() >= maxSize) { // For simplification // eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED, // CacheStoreType.DISK)); @@ -1022,7 +1016,7 @@ public void put(K key, V value) { } @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { V value = cache.computeIfAbsent(key, key1 -> { try { return loader.load(key); @@ -1040,7 +1034,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Except } @Override - public void invalidate(K key) { + public void invalidate(ICacheKey key) { if (this.cache.containsKey(key)) { // eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.DISK)); } @@ -1053,7 +1047,7 @@ public void invalidateAll() { } @Override - public Iterable keys() { + public Iterable> keys() { return this.cache.keySet(); } @@ -1065,6 +1059,11 @@ public long count() { @Override public void refresh() {} + @Override + public CacheStats stats() { + return stats; + } + @Override public void close() { diff --git a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java index 666d89e98127f..fcc661fd16676 100644 --- a/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java +++ b/plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java @@ -16,21 +16,27 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.serializer.ICacheKeySerializer; +import org.opensearch.common.cache.serializer.Serializer; +import org.opensearch.common.cache.stats.CacheStats; +import org.opensearch.common.cache.stats.MultiDimensionCacheStats; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import java.io.File; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -40,6 +46,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import org.ehcache.Cache; import org.ehcache.CachePersistenceException; @@ -50,6 +57,7 @@ import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; import org.ehcache.config.builders.ResourcePoolsBuilder; import org.ehcache.config.units.MemoryUnit; +import org.ehcache.core.spi.service.FileBasedPersistenceContext; import org.ehcache.event.CacheEvent; import org.ehcache.event.CacheEventListener; import org.ehcache.event.EventType; @@ -57,6 +65,7 @@ import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; import org.ehcache.spi.loaderwriter.CacheLoadingException; import org.ehcache.spi.loaderwriter.CacheWritingException; +import org.ehcache.spi.serialization.SerializerException; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_ALIAS_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY; @@ -89,26 +98,29 @@ public class EhcacheDiskCache implements ICache { private final PersistentCacheManager cacheManager; // Disk cache - private Cache cache; + private Cache cache; private final long maxWeightInBytes; private final String storagePath; private final Class keyType; private final Class valueType; private final TimeValue expireAfterAccess; + private final CacheStats stats; private final EhCacheEventListener ehCacheEventListener; private final String threadPoolAlias; private final Settings settings; - private final RemovalListener removalListener; private final CacheType cacheType; private final String diskCacheAlias; - // TODO: Move count to stats once those changes are ready. - private final CounterMetric entries = new CounterMetric(); + + private final Serializer keySerializer; + private final Serializer valueSerializer; + + public final static String TIER_DIMENSION_VALUE = "disk"; /** * Used in computeIfAbsent to synchronize loading of a given key. This is needed as ehcache doesn't provide a * computeIfAbsent method. */ - Map>> completableFutureMap = new ConcurrentHashMap<>(); + Map, CompletableFuture, V>>> completableFutureMap = new ConcurrentHashMap<>(); private EhcacheDiskCache(Builder builder) { this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); @@ -134,34 +146,40 @@ private EhcacheDiskCache(Builder builder) { this.threadPoolAlias = builder.threadPoolAlias; } this.settings = Objects.requireNonNull(builder.getSettings(), "Settings objects shouldn't be null"); + this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null"); + this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); this.cacheManager = buildCacheManager(); - Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null"); - this.removalListener = builder.getRemovalListener(); - this.ehCacheEventListener = new EhCacheEventListener(builder.getRemovalListener()); + this.ehCacheEventListener = new EhCacheEventListener( + Objects.requireNonNull(builder.getRemovalListener(), "Removal listener can't be null"), + Objects.requireNonNull(builder.getWeigher(), "Weigher function can't be null"), + this.valueSerializer + ); this.cache = buildCache(Duration.ofMillis(expireAfterAccess.getMillis()), builder); + List dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null"); + this.stats = new MultiDimensionCacheStats(dimensionNames, TIER_DIMENSION_VALUE); } - private Cache buildCache(Duration expireAfterAccess, Builder builder) { + private Cache buildCache(Duration expireAfterAccess, Builder builder) { try { return this.cacheManager.createCache( this.diskCacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( - this.keyType, - this.valueType, + ICacheKey.class, + byte[].class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) ).withExpiry(new ExpiryPolicy<>() { @Override - public Duration getExpiryForCreation(K key, V value) { + public Duration getExpiryForCreation(ICacheKey key, byte[] value) { return INFINITE; } @Override - public Duration getExpiryForAccess(K key, Supplier value) { + public Duration getExpiryForAccess(ICacheKey key, Supplier value) { return expireAfterAccess; } @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + public Duration getExpiryForUpdate(ICacheKey key, Supplier oldValue, byte[] newValue) { return INFINITE; } }) @@ -175,6 +193,7 @@ public Duration getExpiryForUpdate(K key, Supplier oldValue, V newV (Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings) ) ) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) ); } catch (IllegalArgumentException ex) { logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage()); @@ -202,7 +221,7 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< } // Package private for testing - Map>> getCompletableFutureMap() { + Map, CompletableFuture, V>>> getCompletableFutureMap() { return completableFutureMap; } @@ -231,16 +250,21 @@ private PersistentCacheManager buildCacheManager() { } @Override - public V get(K key) { + public V get(ICacheKey key) { if (key == null) { throw new IllegalArgumentException("Key passed to ehcache disk cache was null."); } V value; try { - value = cache.get(key); + value = valueSerializer.deserialize(cache.get(key)); } catch (CacheLoadingException ex) { throw new OpenSearchException("Exception occurred while trying to fetch item from ehcache disk cache"); } + if (value != null) { + stats.incrementHitsByDimensions(key.dimensions); + } else { + stats.incrementMissesByDimensions(key.dimensions); + } return value; } @@ -250,9 +274,9 @@ public V get(K key) { * @param value Type of value. */ @Override - public void put(K key, V value) { + public void put(ICacheKey key, V value) { try { - cache.put(key, value); + cache.put(key, valueSerializer.serialize(value)); } catch (CacheWritingException ex) { throw new OpenSearchException("Exception occurred while put item to ehcache disk cache"); } @@ -266,29 +290,34 @@ public void put(K key, V value) { * @throws Exception when either internal get or put calls fail. */ @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { - // Ehache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is + public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { + // Ehcache doesn't provide any computeIfAbsent function. Exposes putIfAbsent but that works differently and is // not performant in case there are multiple concurrent request for same key. Below is our own custom // implementation of computeIfAbsent on top of ehcache. Inspired by OpenSearch Cache implementation. - V value = cache.get(key); + V value = valueSerializer.deserialize(cache.get(key)); if (value == null) { value = compute(key, loader); } + if (!loader.isLoaded()) { + stats.incrementHitsByDimensions(key.dimensions); + } else { + stats.incrementMissesByDimensions(key.dimensions); + } return value; } - private V compute(K key, LoadAwareCacheLoader loader) throws Exception { + private V compute(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { // A future that returns a pair of key/value. - CompletableFuture> completableFuture = new CompletableFuture<>(); + CompletableFuture, V>> completableFuture = new CompletableFuture<>(); // Only one of the threads will succeed putting a future into map for the same key. // Rest will fetch existing future. - CompletableFuture> future = completableFutureMap.putIfAbsent(key, completableFuture); + CompletableFuture, V>> future = completableFutureMap.putIfAbsent(key, completableFuture); // Handler to handle results post processing. Takes a tuple or exception as an input and returns // the value. Also before returning value, puts the value in cache. - BiFunction, Throwable, V> handler = (pair, ex) -> { + BiFunction, V>, Throwable, V> handler = (pair, ex) -> { V value = null; if (pair != null) { - cache.put(pair.v1(), pair.v2()); + cache.put(pair.v1(), valueSerializer.serialize(pair.v2())); value = pair.v2(); // Returning a value itself assuming that a next get should return the same. Should // be safe to assume if we got no exception and reached here. } @@ -335,7 +364,7 @@ private V compute(K key, LoadAwareCacheLoader loader) throws Exception { * @param key key to be invalidated. */ @Override - public void invalidate(K key) { + public void invalidate(ICacheKey key) { try { cache.remove(key); } catch (CacheWritingException ex) { @@ -353,7 +382,7 @@ public void invalidateAll() {} * @return Iterable */ @Override - public Iterable keys() { + public Iterable> keys() { return () -> new EhCacheKeyIterator<>(cache.iterator()); } @@ -363,7 +392,7 @@ public Iterable keys() { */ @Override public long count() { - return entries.count(); + return stats.getTotalEntries(); } @Override @@ -382,15 +411,24 @@ public void close() { } } + /** + * Relevant stats for this cache. + * @return CacheStats + */ + @Override + public CacheStats stats() { + return stats; + } + /** * This iterator wraps ehCache iterator and only iterates over its keys. * @param Type of key */ - class EhCacheKeyIterator implements Iterator { + class EhCacheKeyIterator implements Iterator> { - Iterator> iterator; + Iterator> iterator; - EhCacheKeyIterator(Iterator> iterator) { + EhCacheKeyIterator(Iterator> iterator) { this.iterator = iterator; } @@ -400,7 +438,7 @@ public boolean hasNext() { } @Override - public K next() { + public ICacheKey next() { if (!hasNext()) { throw new NoSuchElementException(); } @@ -413,40 +451,70 @@ public K next() { * @param Type of key * @param Type of value */ - class EhCacheEventListener implements CacheEventListener { + class EhCacheEventListener implements CacheEventListener, byte[]> { + private final RemovalListener, V> removalListener; + private ToLongBiFunction, V> weigher; + private Serializer valueSerializer; + + EhCacheEventListener( + RemovalListener, V> removalListener, + ToLongBiFunction, V> weigher, + Serializer valueSerializer + ) { + this.removalListener = removalListener; + this.weigher = weigher; + this.valueSerializer = valueSerializer; + } - private final RemovalListener removalListener; + private long getOldValuePairSize(CacheEvent, ? extends byte[]> event) { + return weigher.applyAsLong(event.getKey(), valueSerializer.deserialize(event.getOldValue())); + } - EhCacheEventListener(RemovalListener removalListener) { - this.removalListener = removalListener; + private long getNewValuePairSize(CacheEvent, ? extends byte[]> event) { + return weigher.applyAsLong(event.getKey(), valueSerializer.deserialize(event.getNewValue())); } @Override - public void onEvent(CacheEvent event) { + public void onEvent(CacheEvent, ? extends byte[]> event) { switch (event.getType()) { case CREATED: - entries.inc(); - // this.eventListener.onCached(event.getKey(), event.getNewValue(), CacheStoreType.DISK); + stats.incrementEntriesByDimensions(event.getKey().dimensions); + stats.incrementMemorySizeByDimensions(event.getKey().dimensions, getNewValuePairSize(event)); assert event.getOldValue() == null; break; case EVICTED: - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED)); - entries.dec(); + this.removalListener.onRemoval( + new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EVICTED) + ); + stats.decrementEntriesByDimensions(event.getKey().dimensions); + stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event)); + stats.incrementEvictionsByDimensions(event.getKey().dimensions); assert event.getNewValue() == null; break; case REMOVED: - entries.dec(); - this.removalListener.onRemoval(new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EXPLICIT)); + this.removalListener.onRemoval( + new RemovalNotification<>(event.getKey(), valueSerializer.deserialize(event.getOldValue()), RemovalReason.EXPLICIT) + ); + stats.decrementEntriesByDimensions(event.getKey().dimensions); + stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event)); assert event.getNewValue() == null; break; case EXPIRED: this.removalListener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED) + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.INVALIDATED + ) ); - entries.dec(); + stats.decrementEntriesByDimensions(event.getKey().dimensions); + stats.incrementMemorySizeByDimensions(event.getKey().dimensions, -getOldValuePairSize(event)); assert event.getNewValue() == null; break; case UPDATED: + long newSize = getNewValuePairSize(event); + long oldSize = getOldValuePairSize(event); + stats.incrementMemorySizeByDimensions(event.getKey().dimensions, newSize - oldSize); break; default: break; @@ -454,6 +522,38 @@ public void onEvent(CacheEvent event) { } } + private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { + private ICacheKeySerializer serializer; + + public KeySerializerWrapper(Serializer internalKeySerializer) { + this.serializer = new ICacheKeySerializer<>(internalKeySerializer); + } + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(ICacheKey object) throws SerializerException { + return ByteBuffer.wrap(serializer.serialize(object)); + } + + @Override + public ICacheKey read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.deserialize(arr); + } + + @Override + public boolean equals(ICacheKey object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.equals(object, arr); + } + } + /** * Factory to create an ehcache disk cache. */ @@ -473,11 +573,30 @@ public EhcacheDiskCacheFactory() {} public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); + + Serializer keySerializer = null; + try { + keySerializer = (Serializer) config.getKeySerializer(); + } catch (ClassCastException e) { + throw new IllegalArgumentException("EhcacheDiskCache requires a key serializer of type Serializer"); + } + + Serializer valueSerializer = null; + try { + valueSerializer = (Serializer) config.getValueSerializer(); + } catch (ClassCastException e) { + throw new IllegalArgumentException("EhcacheDiskCache requires a value serializer of type Serializer"); + } + return new Builder().setStoragePath((String) settingList.get(DISK_STORAGE_PATH_KEY).get(settings)) .setDiskCacheAlias((String) settingList.get(DISK_CACHE_ALIAS_KEY).get(settings)) .setCacheType(cacheType) .setKeyType((config.getKeyType())) .setValueType(config.getValueType()) + .setKeySerializer(keySerializer) + .setValueSerializer(valueSerializer) + .setDimensionNames(config.getDimensionNames()) + .setWeigher(config.getWeigher()) .setRemovalListener(config.getRemovalListener()) .setExpireAfterAccess((TimeValue) settingList.get(DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY).get(settings)) .setMaximumWeightInBytes((Long) settingList.get(DISK_MAX_SIZE_IN_BYTES_KEY).get(settings)) @@ -497,20 +616,18 @@ public String getCacheName() { * @param Type of value */ public static class Builder extends ICacheBuilder { - private CacheType cacheType; private String storagePath; - private String threadPoolAlias; - private String diskCacheAlias; // Provides capability to make ehCache event listener to run in sync mode. Used for testing too. private boolean isEventListenerModeSync; - private Class keyType; - private Class valueType; + private List dimensionNames; + private Serializer keySerializer; + private Serializer valueSerializer; /** * Default constructor. Added to fix javadocs. @@ -587,7 +704,22 @@ public Builder setIsEventListenerModeSync(boolean isEventListenerModeSync) return this; } - @Override + public Builder setDimensionNames(List dimensionNames) { + this.dimensionNames = dimensionNames; + return this; + } + + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + + // @Override public EhcacheDiskCache build() { return new EhcacheDiskCache<>(this); } diff --git a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java index 862bebba7e628..20230878dbf89 100644 --- a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java +++ b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java @@ -11,17 +11,22 @@ import org.opensearch.cache.EhcacheDiskCacheSettings; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.serializer.Serializer; +import org.opensearch.common.cache.stats.CacheStatsDimension; import org.opensearch.common.cache.store.config.CacheConfig; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,6 +36,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.ToLongBiFunction; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_MAX_SIZE_IN_BYTES_KEY; import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY; @@ -39,41 +46,58 @@ public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase { private static final int CACHE_SIZE_IN_BYTES = 1024 * 101; + private final String dimensionName = "shardId"; public void testBasicGetAndPut() throws IOException { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); + ToLongBiFunction, String> weigher = getWeigher(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setThreadPoolAlias("ehcacheTest") .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(weigher) .build(); int randomKeys = randomIntBetween(10, 100); + long expectedSize = 0; Map keyValueMap = new HashMap<>(); for (int i = 0; i < randomKeys; i++) { keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } for (Map.Entry entry : keyValueMap.entrySet()) { - ehcacheTest.put(entry.getKey(), entry.getValue()); + ICacheKey iCacheKey = getICacheKey(entry.getKey()); + ehcacheTest.put(iCacheKey, entry.getValue()); + expectedSize += weigher.applyAsLong(iCacheKey, entry.getValue()); } for (Map.Entry entry : keyValueMap.entrySet()) { - String value = ehcacheTest.get(entry.getKey()); + String value = ehcacheTest.get(getICacheKey(entry.getKey())); assertEquals(entry.getValue(), value); } + assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries()); + assertEquals(randomKeys, ehcacheTest.stats().getEntriesByDimensions(List.of(getMockDimensions().get(0)))); + assertEquals(randomKeys, ehcacheTest.stats().getTotalHits()); + assertEquals(randomKeys, ehcacheTest.stats().getHitsByDimensions(List.of(getMockDimensions().get(0)))); + assertEquals(expectedSize, ehcacheTest.stats().getTotalMemorySize()); + assertEquals(expectedSize, ehcacheTest.stats().getMemorySizeByDimensions(List.of(getMockDimensions().get(0)))); assertEquals(randomKeys, ehcacheTest.count()); // Validate misses int expectedNumberOfMisses = randomIntBetween(10, 200); for (int i = 0; i < expectedNumberOfMisses; i++) { - ehcacheTest.get(UUID.randomUUID().toString()); + ehcacheTest.get(getICacheKey(UUID.randomUUID().toString())); } + assertEquals(expectedNumberOfMisses, ehcacheTest.stats().getTotalMisses()); + assertEquals(expectedNumberOfMisses, ehcacheTest.stats().getMissesByDimensions(List.of(getMockDimensions().get(0)))); ehcacheTest.close(); } } @@ -86,6 +110,10 @@ public void testBasicGetAndPutUsingFactory() throws IOException { new CacheConfig.Builder().setValueType(String.class) .setKeyType(String.class) .setRemovalListener(removalListener) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) + .setWeigher(getWeigher()) .setSettings( Settings.builder() .put( @@ -107,14 +135,14 @@ public void testBasicGetAndPutUsingFactory() throws IOException { Map.of() ); int randomKeys = randomIntBetween(10, 100); - Map keyValueMap = new HashMap<>(); + Map, String> keyValueMap = new HashMap<>(); for (int i = 0; i < randomKeys; i++) { - keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + keyValueMap.put(getICacheKey(UUID.randomUUID().toString()), UUID.randomUUID().toString()); } - for (Map.Entry entry : keyValueMap.entrySet()) { + for (Map.Entry, String> entry : keyValueMap.entrySet()) { ehcacheTest.put(entry.getKey(), entry.getValue()); } - for (Map.Entry entry : keyValueMap.entrySet()) { + for (Map.Entry, String> entry : keyValueMap.entrySet()) { String value = ehcacheTest.get(entry.getKey()); assertEquals(entry.getValue(), value); } @@ -123,7 +151,7 @@ public void testBasicGetAndPutUsingFactory() throws IOException { // Validate misses int expectedNumberOfMisses = randomIntBetween(10, 200); for (int i = 0; i < expectedNumberOfMisses; i++) { - ehcacheTest.get(UUID.randomUUID().toString()); + ehcacheTest.get(getICacheKey(UUID.randomUUID().toString())); } ehcacheTest.close(); @@ -132,18 +160,22 @@ public void testBasicGetAndPutUsingFactory() throws IOException { public void testConcurrentPut() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setThreadPoolAlias("ehcacheTest") .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(getWeigher()) .build(); int randomKeys = randomIntBetween(20, 100); Thread[] threads = new Thread[randomKeys]; @@ -157,7 +189,7 @@ public void testConcurrentPut() throws Exception { for (Map.Entry entry : keyValueMap.entrySet()) { threads[j] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - ehcacheTest.put(entry.getKey(), entry.getValue()); + ehcacheTest.put(getICacheKey(entry.getKey()), entry.getValue()); countDownLatch.countDown(); }); threads[j].start(); @@ -166,17 +198,17 @@ public void testConcurrentPut() throws Exception { phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. countDownLatch.await(); // Wait for all threads to finish for (Map.Entry entry : keyValueMap.entrySet()) { - String value = ehcacheTest.get(entry.getKey()); + String value = ehcacheTest.get(getICacheKey(entry.getKey())); assertEquals(entry.getValue(), value); } - assertEquals(randomKeys, ehcacheTest.count()); + assertEquals(randomKeys, ehcacheTest.stats().getTotalEntries()); ehcacheTest.close(); } } public void testEhcacheParallelGets() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setThreadPoolAlias("ehcacheTest") @@ -184,11 +216,15 @@ public void testEhcacheParallelGets() throws Exception { .setIsEventListenerModeSync(true) // For accurate count .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(getWeigher()) .build(); int randomKeys = randomIntBetween(20, 100); Thread[] threads = new Thread[randomKeys]; @@ -200,13 +236,13 @@ public void testEhcacheParallelGets() throws Exception { keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } for (Map.Entry entry : keyValueMap.entrySet()) { - ehcacheTest.put(entry.getKey(), entry.getValue()); + ehcacheTest.put(getICacheKey(entry.getKey()), entry.getValue()); } assertEquals(keyValueMap.size(), ehcacheTest.count()); for (Map.Entry entry : keyValueMap.entrySet()) { threads[j] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - assertEquals(entry.getValue(), ehcacheTest.get(entry.getKey())); + assertEquals(entry.getValue(), ehcacheTest.get(getICacheKey(entry.getKey()))); countDownLatch.countDown(); }); threads[j].start(); @@ -214,6 +250,7 @@ public void testEhcacheParallelGets() throws Exception { } phaser.arriveAndAwaitAdvance(); // Will trigger parallel puts above. countDownLatch.await(); // Wait for all threads to finish + assertEquals(randomKeys, ehcacheTest.stats().getTotalHits()); ehcacheTest.close(); } } @@ -226,11 +263,15 @@ public void testEhcacheKeyIterator() throws Exception { .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setRemovalListener(new MockRemovalListener<>()) + .setWeigher(getWeigher()) .build(); int randomKeys = randomIntBetween(2, 100); @@ -239,12 +280,12 @@ public void testEhcacheKeyIterator() throws Exception { keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } for (Map.Entry entry : keyValueMap.entrySet()) { - ehcacheTest.put(entry.getKey(), entry.getValue()); + ehcacheTest.put(getICacheKey(entry.getKey()), entry.getValue()); } - Iterator keys = ehcacheTest.keys().iterator(); + Iterator> keys = ehcacheTest.keys().iterator(); int keysCount = 0; while (keys.hasNext()) { - String key = keys.next(); + ICacheKey key = keys.next(); keysCount++; assertNotNull(ehcacheTest.get(key)); } @@ -255,7 +296,8 @@ public void testEhcacheKeyIterator() throws Exception { public void testEvictions() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); + ToLongBiFunction, String> weigher = getWeigher(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") @@ -263,11 +305,15 @@ public void testEvictions() throws Exception { .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(weigher) .build(); // Generate a string with 100 characters @@ -276,16 +322,18 @@ public void testEvictions() throws Exception { // Trying to generate more than 100kb to cause evictions. for (int i = 0; i < 1000; i++) { String key = "Key" + i; - ehcacheTest.put(key, value); + ICacheKey iCacheKey = getICacheKey((key)); + ehcacheTest.put(iCacheKey, value); } - assertEquals(660, removalListener.evictionMetric.count()); + assertTrue(mockRemovalListener.onRemovalCount.get() > 0); + assertEquals(660, ehcacheTest.stats().getTotalEvictions()); ehcacheTest.close(); } } public void testComputeIfAbsentConcurrently() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setIsEventListenerModeSync(true) @@ -293,11 +341,15 @@ public void testComputeIfAbsentConcurrently() throws Exception { .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(getWeigher()) .build(); int numberOfRequest = 2;// randomIntBetween(200, 400); @@ -307,12 +359,12 @@ public void testComputeIfAbsentConcurrently() throws Exception { Phaser phaser = new Phaser(numberOfRequest + 1); CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); - List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); // Try to hit different request with the same key concurrently. Verify value is only loaded once. for (int i = 0; i < numberOfRequest; i++) { threads[i] = new Thread(() -> { - LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { boolean isLoaded; @Override @@ -321,7 +373,7 @@ public boolean isLoaded() { } @Override - public String load(String key) { + public String load(ICacheKey key) { isLoaded = true; return value; } @@ -329,7 +381,7 @@ public String load(String key) { loadAwareCacheLoaderList.add(loadAwareCacheLoader); phaser.arriveAndAwaitAdvance(); try { - assertEquals(value, ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader)); + assertEquals(value, ehcacheTest.computeIfAbsent(getICacheKey(key), loadAwareCacheLoader)); } catch (Exception e) { throw new RuntimeException(e); } @@ -347,6 +399,9 @@ public String load(String key) { } assertEquals(1, numberOfTimesValueLoaded); assertEquals(0, ((EhcacheDiskCache) ehcacheTest).getCompletableFutureMap().size()); + assertEquals(1, ehcacheTest.stats().getTotalMisses()); + assertEquals(1, ehcacheTest.stats().getTotalEntries()); + assertEquals(numberOfRequest - 1, ehcacheTest.stats().getTotalHits()); assertEquals(1, ehcacheTest.count()); ehcacheTest.close(); } @@ -354,7 +409,7 @@ public String load(String key) { public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") @@ -362,11 +417,15 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception .setThreadPoolAlias("ehcacheTest") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(getWeigher()) .build(); int numberOfRequest = randomIntBetween(200, 400); @@ -375,12 +434,12 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception Phaser phaser = new Phaser(numberOfRequest + 1); CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); - List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); // Try to hit different request with the same key concurrently. Loader throws exception. for (int i = 0; i < numberOfRequest; i++) { threads[i] = new Thread(() -> { - LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { boolean isLoaded; @Override @@ -389,14 +448,14 @@ public boolean isLoaded() { } @Override - public String load(String key) throws Exception { + public String load(ICacheKey key) throws Exception { isLoaded = true; throw new RuntimeException("Exception"); } }; loadAwareCacheLoaderList.add(loadAwareCacheLoader); phaser.arriveAndAwaitAdvance(); - assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader)); + assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(getICacheKey(key), loadAwareCacheLoader)); countDownLatch.countDown(); }); threads[i].start(); @@ -411,7 +470,7 @@ public String load(String key) throws Exception { public void testComputeIfAbsentWithNullValueLoading() throws Exception { Settings settings = Settings.builder().build(); - MockRemovalListener removalListener = new MockRemovalListener<>(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); try (NodeEnvironment env = newNodeEnvironment(settings)) { ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") .setThreadPoolAlias("ehcacheTest") @@ -419,11 +478,15 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception { .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setKeyType(String.class) .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) .setCacheType(CacheType.INDICES_REQUEST_CACHE) .setSettings(settings) .setExpireAfterAccess(TimeValue.MAX_VALUE) .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) - .setRemovalListener(removalListener) + .setRemovalListener(mockRemovalListener) + .setWeigher(getWeigher()) .build(); int numberOfRequest = randomIntBetween(200, 400); @@ -432,12 +495,12 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception { Phaser phaser = new Phaser(numberOfRequest + 1); CountDownLatch countDownLatch = new CountDownLatch(numberOfRequest); - List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + List, String>> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); // Try to hit different request with the same key concurrently. Loader throws exception. for (int i = 0; i < numberOfRequest; i++) { threads[i] = new Thread(() -> { - LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader<>() { + LoadAwareCacheLoader, String> loadAwareCacheLoader = new LoadAwareCacheLoader<>() { boolean isLoaded; @Override @@ -446,7 +509,7 @@ public boolean isLoaded() { } @Override - public String load(String key) throws Exception { + public String load(ICacheKey key) throws Exception { isLoaded = true; return null; } @@ -454,11 +517,11 @@ public String load(String key) throws Exception { loadAwareCacheLoaderList.add(loadAwareCacheLoader); phaser.arriveAndAwaitAdvance(); try { - ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader); + ehcacheTest.computeIfAbsent(getICacheKey(key), loadAwareCacheLoader); } catch (Exception ex) { assertThat(ex.getCause(), instanceOf(NullPointerException.class)); } - assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(key, loadAwareCacheLoader)); + assertThrows(ExecutionException.class, () -> ehcacheTest.computeIfAbsent(getICacheKey(key), loadAwareCacheLoader)); countDownLatch.countDown(); }); threads[i].start(); @@ -471,6 +534,124 @@ public String load(String key) throws Exception { } } + public void testMemoryTracking() throws Exception { + // TODO: This test leaks threads because of an issue in Ehcache: + // https://github.com/ehcache/ehcache3/issues/3204 + + // Test all cases for EhCacheEventListener.onEvent and check stats memory usage is updated correctly + Settings settings = Settings.builder().build(); + ToLongBiFunction, String> weigher = getWeigher(); + int initialKeyLength = 40; + int initialValueLength = 40; + long sizeForOneInitialEntry = weigher.applyAsLong( + new ICacheKey<>(generateRandomString(initialKeyLength), getMockDimensions()), + generateRandomString(initialValueLength) + ); + int maxEntries = 2000; + try (NodeEnvironment env = newNodeEnvironment(settings)) { + ICache ehcacheTest = new EhcacheDiskCache.Builder().setDiskCacheAlias("test1") + .setThreadPoolAlias("ehcacheTest") + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeyType(String.class) + .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) + .setIsEventListenerModeSync(true) // Test fails if async; probably not all updates happen before checking stats + .setCacheType(CacheType.INDICES_REQUEST_CACHE) + .setSettings(settings) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(maxEntries * sizeForOneInitialEntry) + .setRemovalListener(new MockRemovalListener<>()) + .setWeigher(weigher) + .build(); + long expectedSize = 0; + + // Test CREATED case + int numInitialKeys = randomIntBetween(10, 100); + ArrayList> initialKeys = new ArrayList<>(); + for (int i = 0; i < numInitialKeys; i++) { + ICacheKey key = new ICacheKey<>(generateRandomString(initialKeyLength), getMockDimensions()); + String value = generateRandomString(initialValueLength); + ehcacheTest.put(key, value); + initialKeys.add(key); + expectedSize += weigher.applyAsLong(key, value); + assertEquals(expectedSize, ehcacheTest.stats().getTotalMemorySize()); + } + + // Test UPDATED case + HashMap, String> updatedValues = new HashMap<>(); + for (int i = 0; i < numInitialKeys * 0.5; i++) { + int newLengthDifference = randomIntBetween(-20, 20); + String newValue = generateRandomString(initialValueLength + newLengthDifference); + ehcacheTest.put(initialKeys.get(i), newValue); + updatedValues.put(initialKeys.get(i), newValue); + expectedSize += newLengthDifference; + assertEquals(expectedSize, ehcacheTest.stats().getTotalMemorySize()); + } + + // Test REMOVED case by removing all updated keys + for (int i = 0; i < numInitialKeys * 0.5; i++) { + ICacheKey removedKey = initialKeys.get(i); + ehcacheTest.invalidate(removedKey); + expectedSize -= weigher.applyAsLong(removedKey, updatedValues.get(removedKey)); + assertEquals(expectedSize, ehcacheTest.stats().getTotalMemorySize()); + } + + // Test EVICTED case by adding entries past the cap and ensuring memory size stays as what we expect + for (int i = 0; i < maxEntries - ehcacheTest.count(); i++) { + ICacheKey key = new ICacheKey<>(generateRandomString(initialKeyLength), getMockDimensions()); + String value = generateRandomString(initialValueLength); + ehcacheTest.put(key, value); + } + // TODO: Ehcache incorrectly evicts at 30-40% of max size. Fix this test once we figure out why. + // Since the EVICTED and EXPIRED cases use the same code as REMOVED, we should be ok on testing them for now. + // assertEquals(maxEntries * sizeForOneInitialEntry, ehcacheTest.stats().getTotalMemorySize()); + + ehcacheTest.close(); + } + } + + public void testGetStatsByTierName() throws Exception { + Settings settings = Settings.builder().build(); + MockRemovalListener mockRemovalListener = new MockRemovalListener<>(); + ToLongBiFunction, String> weigher = getWeigher(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + ICache ehcacheTest = new EhcacheDiskCache.Builder().setThreadPoolAlias("ehcacheTest") + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeyType(String.class) + .setValueType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setDimensionNames(List.of(dimensionName)) + .setCacheType(CacheType.INDICES_REQUEST_CACHE) + .setSettings(settings) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setRemovalListener(mockRemovalListener) + .setWeigher(weigher) + .build(); + int randomKeys = randomIntBetween(10, 100); + for (int i = 0; i < randomKeys; i++) { + ehcacheTest.put(getICacheKey(UUID.randomUUID().toString()), UUID.randomUUID().toString()); + } + assertEquals( + randomKeys, + ehcacheTest.stats() + .getEntriesByDimensions( + List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, EhcacheDiskCache.TIER_DIMENSION_VALUE)) + ) + ); + assertEquals( + 0, + ehcacheTest.stats() + .getEntriesByDimensions(List.of(new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, "other_tier_value"))) + ); + + ehcacheTest.close(); + } + } + private static String generateRandomString(int length) { String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; StringBuilder randomString = new StringBuilder(length); @@ -483,13 +664,57 @@ private static String generateRandomString(int length) { return randomString.toString(); } - static class MockRemovalListener implements RemovalListener { + private List getMockDimensions() { + return List.of(new CacheStatsDimension(dimensionName, "0")); + } + + private ICacheKey getICacheKey(String key) { + return new ICacheKey<>(key, getMockDimensions()); + } - CounterMetric evictionMetric = new CounterMetric(); + private ToLongBiFunction, String> getWeigher() { + return (iCacheKey, value) -> { + // Size consumed by key + long totalSize = iCacheKey.key.length(); + for (CacheStatsDimension dim : iCacheKey.dimensions) { + totalSize += dim.dimensionName.length(); + totalSize += dim.dimensionValue.length(); + } + totalSize += 10; // The ICacheKeySerializer writes 2 VInts to record array lengths, which can be 1-5 bytes each + // Size consumed by value + totalSize += value.length(); + return totalSize; + }; + } + + class MockRemovalListener implements RemovalListener, V> { + AtomicInteger onRemovalCount = new AtomicInteger(); + + @Override + public void onRemoval(RemovalNotification, V> notification) { + onRemovalCount.incrementAndGet(); + } + } + + static class StringSerializer implements Serializer { + private final Charset charset = StandardCharsets.UTF_8; + + @Override + public byte[] serialize(String object) { + return object.getBytes(charset); + } + + @Override + public String deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + return new String(bytes, charset); + } @Override - public void onRemoval(RemovalNotification notification) { - evictionMetric.inc(); + public boolean equals(String object, byte[] bytes) { + return object.equals(deserialize(bytes)); } } } diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index d8aa4e93735e6..ad8a1f01bb0fe 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -899,4 +899,8 @@ private void relinkAtHead(Entry entry) { private CacheSegment getCacheSegment(K key) { return segments[key.hashCode() & 0xff]; } + + public ToLongBiFunction getWeigher() { + return weigher; + } } diff --git a/server/src/main/java/org/opensearch/common/cache/ICache.java b/server/src/main/java/org/opensearch/common/cache/ICache.java index f7be46a852631..a7a712cc83ab3 100644 --- a/server/src/main/java/org/opensearch/common/cache/ICache.java +++ b/server/src/main/java/org/opensearch/common/cache/ICache.java @@ -9,6 +9,7 @@ package org.opensearch.common.cache; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.cache.stats.CacheStats; import org.opensearch.common.cache.store.config.CacheConfig; import java.io.Closeable; @@ -23,22 +24,24 @@ */ @ExperimentalApi public interface ICache extends Closeable { - V get(K key); + V get(ICacheKey key); - void put(K key, V value); + void put(ICacheKey key, V value); - V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception; + V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception; - void invalidate(K key); + void invalidate(ICacheKey key); void invalidateAll(); - Iterable keys(); + Iterable> keys(); long count(); void refresh(); + CacheStats stats(); + /** * Factory to create objects. */ diff --git a/server/src/main/java/org/opensearch/common/cache/ICacheKey.java b/server/src/main/java/org/opensearch/common/cache/ICacheKey.java new file mode 100644 index 0000000000000..51cb1712873c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/ICacheKey.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +import org.opensearch.common.cache.stats.CacheStatsDimension; + +import java.util.List; +import java.util.Objects; + +public class ICacheKey { + public final K key; // K must implement equals() + public final List dimensions; + + public ICacheKey(K key, List dimensions) { + this.key = key; + this.dimensions = dimensions; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null) { + return false; + } + if (o.getClass() != ICacheKey.class) { + return false; + } + ICacheKey other = (ICacheKey) o; + return key.equals(other.key) && dimensions.equals(other.dimensions); + } + + @Override + public int hashCode() { + return Objects.hash(key, dimensions); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java new file mode 100644 index 0000000000000..d1cd872f5801f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/BytesReferenceSerializer.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; + +import java.util.Arrays; + +/** + * A serializer which transforms BytesReference to byte[]. + * The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care. + */ +public class BytesReferenceSerializer implements Serializer { + // This class does not get passed to ehcache itself, so it's not required that classes match after deserialization. + + public BytesReferenceSerializer() {} + + @Override + public byte[] serialize(BytesReference object) { + return BytesReference.toBytes(object); + } + + @Override + public BytesReference deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + return new BytesArray(bytes); + } + + @Override + public boolean equals(BytesReference object, byte[] bytes) { + return Arrays.equals(serialize(object), bytes); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/ICacheKeySerializer.java b/server/src/main/java/org/opensearch/common/cache/serializer/ICacheKeySerializer.java new file mode 100644 index 0000000000000..af95f119f286a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/ICacheKeySerializer.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.cache.ICacheKey; +import org.opensearch.common.cache.stats.CacheStatsDimension; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ICacheKeySerializer implements Serializer, byte[]> { + + public Serializer keySerializer; + + public ICacheKeySerializer(Serializer serializer) { + this.keySerializer = serializer; + } + + @Override + public byte[] serialize(ICacheKey object) { + if (object == null || object.key == null || object.dimensions == null) { + return null; + } + byte[] serializedKey = keySerializer.serialize(object.key); + try { + BytesStreamOutput os = new BytesStreamOutput(); + // First write the number of dimensions + os.writeVInt(object.dimensions.size()); + for (CacheStatsDimension dim : object.dimensions) { + dim.writeTo(os); + } + os.writeVInt(serializedKey.length); // ?? Is the read byte[] fn broken such that we have to do this? + os.writeBytes(serializedKey); // TODO: Is this re-copying unnecessarily? Come back to this + byte[] finalBytes = BytesReference.toBytes(os.bytes()); + return finalBytes; + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public ICacheKey deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + List dimensionList = new ArrayList<>(); + try { + BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length); + int numDimensions = is.readVInt(); + for (int i = 0; i < numDimensions; i++) { + dimensionList.add(new CacheStatsDimension(is)); + } + + int length = is.readVInt(); + byte[] serializedKey = new byte[length]; + is.readBytes(serializedKey, 0, length); // not sure why is.readByteArray doesn't work?? + return new ICacheKey<>(keySerializer.deserialize(serializedKey), dimensionList); + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public boolean equals(ICacheKey object, byte[] bytes) { + return Arrays.equals(serialize(object), bytes); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java b/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java new file mode 100644 index 0000000000000..e9e3d81a0c4b8 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/serializer/Serializer.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +/** + * An interface for serializers, to be used in disk caching tier and elsewhere. + * T is the class of the original object, and U is the serialized class. + */ +public interface Serializer { + /** + * Serializes an object. + * @param object A non-serialized object. + * @return The serialized representation of the object. + */ + U serialize(T object); + + /** + * Deserializes bytes into an object. + * @param bytes The serialized representation. + * @return The original object. + */ + T deserialize(U bytes); + + /** + * Compares an object to a serialized representation of an object. + * @param object A non-serialized objet + * @param bytes Serialized representation of an object + * @return true if representing the same object, false if not + */ + boolean equals(T object, U bytes); +} diff --git a/server/src/main/java/org/opensearch/common/cache/stats/CacheStats.java b/server/src/main/java/org/opensearch/common/cache/stats/CacheStats.java new file mode 100644 index 0000000000000..7b24e3412c1f6 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/stats/CacheStats.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.core.common.io.stream.Writeable; + +import java.util.List; + +/** + * Interface for any cache specific stats. Allows accessing stats by total value or by dimension, + * and also allows updating stats. + * When updating stats, we take in the list of dimensions associated with the key/value pair that caused the update. + * This allows us to aggregate stats by dimension when accessing them. + */ +public interface CacheStats extends Writeable { + + // Methods to get all 5 values at once, either in total or for a specific set of dimensions. + CacheStatsResponse getTotalStats(); + + CacheStatsResponse getStatsByDimensions(List dimensions); + + // Methods to get total values. + long getTotalHits(); + + long getTotalMisses(); + + long getTotalEvictions(); + + long getTotalMemorySize(); + + long getTotalEntries(); + + // Methods to get values for a specific set of dimensions. + // Returns the sum of values for cache entries that match all dimensions in the list. + long getHitsByDimensions(List dimensions); + + long getMissesByDimensions(List dimensions); + + long getEvictionsByDimensions(List dimensions); + + long getMemorySizeByDimensions(List dimensions); + + long getEntriesByDimensions(List dimensions); + + void incrementHitsByDimensions(List dimensions); + + void incrementMissesByDimensions(List dimensions); + + void incrementEvictionsByDimensions(List dimensions); + + // Can also use to decrement, with negative values + void incrementMemorySizeByDimensions(List dimensions, long amountBytes); + + void incrementEntriesByDimensions(List dimensions); + + void decrementEntriesByDimensions(List dimensions); + + // Resets memory and entries stats but leaves the others; called when the cache clears itself. + void reset(); + +} diff --git a/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsDimension.java b/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsDimension.java new file mode 100644 index 0000000000000..9aee24efb46f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsDimension.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +public class CacheStatsDimension implements Writeable { + // Values for tier dimensions, that are reused across CacheStats implementations + public static final String TIER_DIMENSION_NAME = "tier"; + public final String dimensionName; + public final String dimensionValue; + + public CacheStatsDimension(String dimensionName, String dimensionValue) { + this.dimensionName = dimensionName; + this.dimensionValue = dimensionValue; + } + + public CacheStatsDimension(StreamInput in) throws IOException { + this.dimensionName = in.readString(); + this.dimensionValue = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dimensionName); + out.writeString(dimensionValue); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null) { + return false; + } + if (o.getClass() != CacheStatsDimension.class) { + return false; + } + CacheStatsDimension other = (CacheStatsDimension) o; + if (other.dimensionName == null || other.dimensionValue == null) { + return false; + } + return other.dimensionName.equals(dimensionName) && other.dimensionValue.equals(dimensionValue); + } + + @Override + public int hashCode() { + return Objects.hash(dimensionName, dimensionValue); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsResponse.java b/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsResponse.java new file mode 100644 index 0000000000000..520a771510c43 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/stats/CacheStatsResponse.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Objects; + +/** + * A class containing the 5 metrics tracked by a CacheStats object. + */ +public class CacheStatsResponse implements Writeable { // TODO: Make this extend ToXContent. + public CounterMetric hits; + public CounterMetric misses; + public CounterMetric evictions; + public CounterMetric memorySize; + public CounterMetric entries; + + public CacheStatsResponse(long hits, long misses, long evictions, long memorySize, long entries) { + this.hits = new CounterMetric(); + this.hits.inc(hits); + this.misses = new CounterMetric(); + this.misses.inc(misses); + this.evictions = new CounterMetric(); + this.evictions.inc(evictions); + this.memorySize = new CounterMetric(); + this.memorySize.inc(memorySize); + this.entries = new CounterMetric(); + this.entries.inc(entries); + } + + public CacheStatsResponse(StreamInput in) throws IOException { + this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + } + + public CacheStatsResponse() { + this(0, 0, 0, 0, 0); + } + + public synchronized void add(CacheStatsResponse other) { + if (other == null) { + return; + } + this.hits.inc(other.hits.count()); + this.misses.inc(other.misses.count()); + this.evictions.inc(other.evictions.count()); + this.memorySize.inc(other.memorySize.count()); + this.entries.inc(other.entries.count()); + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + if (o.getClass() != CacheStatsResponse.class) { + return false; + } + CacheStatsResponse other = (CacheStatsResponse) o; + return (hits.count() == other.hits.count()) + && (misses.count() == other.misses.count()) + && (evictions.count() == other.evictions.count()) + && (memorySize.count() == other.memorySize.count()) + && (entries.count() == other.entries.count()); + } + + @Override + public int hashCode() { + return Objects.hash(hits.count(), misses.count(), evictions.count(), memorySize.count(), entries.count()); + } + + public long getHits() { + return hits.count(); + } + + public long getMisses() { + return misses.count(); + } + + public long getEvictions() { + return evictions.count(); + } + + public long getMemorySize() { + return memorySize.count(); + } + + public long getEntries() { + return entries.count(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(hits.count()); + out.writeVLong(misses.count()); + out.writeVLong(evictions.count()); + out.writeVLong(memorySize.count()); + out.writeVLong(entries.count()); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/stats/MultiDimensionCacheStats.java b/server/src/main/java/org/opensearch/common/cache/stats/MultiDimensionCacheStats.java new file mode 100644 index 0000000000000..1f977a7c040b3 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/stats/MultiDimensionCacheStats.java @@ -0,0 +1,295 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; + +/** + * A CacheStats object supporting multiple different dimensions. + * Also keeps track of a tier dimension, which is assumed to be the same for all values in the stats object. + * The tier dimension value should not be passed into the CacheStats API functions for updating values. + */ +public class MultiDimensionCacheStats implements CacheStats { + + /** + * For memory purposes, don't track stats for more than this many distinct combinations of dimension values. + */ + public final static int DEFAULT_MAX_DIMENSION_VALUES = 20_000; + + // pkg-private for testing + final List dimensionNames; + + // The value of the tier dimension for entries in this Stats object. This is handled separately for efficiency, + // as it always has the same value for every entry in the stats object. + // Package-private for testing. + final String tierDimensionValue; + + // A map from a set of cache stats dimensions -> stats for that combination of dimensions. Does not include the tier dimension in its + // keys. + final ConcurrentMap map; + + final int maxDimensionValues; + CacheStatsResponse totalStats; + + public MultiDimensionCacheStats(List dimensionNames, String tierDimensionValue, int maxDimensionValues) { + this.dimensionNames = dimensionNames; + this.map = new ConcurrentHashMap<>(); + this.totalStats = new CacheStatsResponse(); + this.tierDimensionValue = tierDimensionValue; + this.maxDimensionValues = maxDimensionValues; + } + + public MultiDimensionCacheStats(List dimensionNames, String tierDimensionValue) { + this(dimensionNames, tierDimensionValue, DEFAULT_MAX_DIMENSION_VALUES); + } + + public MultiDimensionCacheStats(StreamInput in) throws IOException { + this.dimensionNames = List.of(in.readStringArray()); + this.tierDimensionValue = in.readString(); + Map readMap = in.readMap( + i -> new Key(Set.of(i.readArray(CacheStatsDimension::new, CacheStatsDimension[]::new))), + CacheStatsResponse::new + ); + this.map = new ConcurrentHashMap(readMap); + this.totalStats = new CacheStatsResponse(in); + this.maxDimensionValues = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(dimensionNames.toArray(new String[0])); + out.writeString(tierDimensionValue); + out.writeMap( + map, + (o, key) -> o.writeArray((o1, dim) -> ((CacheStatsDimension) dim).writeTo(o1), key.dimensions.toArray()), + (o, response) -> response.writeTo(o) + ); + totalStats.writeTo(out); + out.writeVInt(maxDimensionValues); + } + + @Override + public CacheStatsResponse getTotalStats() { + return totalStats; + } + + /** + * Get the stats response aggregated by dimensions. If there are no values for the specified dimensions, + * returns an all-zero response. + */ + @Override + public CacheStatsResponse getStatsByDimensions(List dimensions) { + if (!checkDimensionNames(dimensions)) { + throw new IllegalArgumentException("Can't get stats for unrecognized dimensions"); + } + + CacheStatsDimension tierDim = getTierDimension(dimensions); + if (tierDim == null || tierDim.dimensionValue.equals(tierDimensionValue)) { + // If there is no tier dimension, or if the tier dimension value matches the one for this stats object, return an aggregated + // response over the non-tier dimensions + List modifiedDimensions = new ArrayList<>(dimensions); + if (tierDim != null) { + modifiedDimensions.remove(tierDim); + } + + if (modifiedDimensions.size() == dimensionNames.size()) { + return map.getOrDefault(new Key(modifiedDimensions), new CacheStatsResponse()); + } + + // I don't think there's a more efficient way to get arbitrary combinations of dimensions than to just keep a map + // and iterate through it, checking if keys match. We can't pre-aggregate because it would consume a lot of memory. + CacheStatsResponse response = new CacheStatsResponse(); + for (Key key : map.keySet()) { + if (key.dimensions.containsAll(modifiedDimensions)) { + response.add(map.get(key)); + } + } + return response; + } + // If the tier dimension doesn't match, return an all-zero response + return new CacheStatsResponse(); + } + + private CacheStatsDimension getTierDimension(List dimensions) { + for (CacheStatsDimension dim : dimensions) { + if (dim.dimensionName.equals(CacheStatsDimension.TIER_DIMENSION_NAME)) { + return dim; + } + } + return null; + } + + private boolean checkDimensionNames(List dimensions) { + for (CacheStatsDimension dim : dimensions) { + if (!(dimensionNames.contains(dim.dimensionName) || dim.dimensionName.equals(CacheStatsDimension.TIER_DIMENSION_NAME))) { + // Reject dimension names that aren't in the list and aren't the tier dimension + return false; + } + } + return true; + } + + @Override + public long getTotalHits() { + return totalStats.getHits(); + } + + @Override + public long getTotalMisses() { + return totalStats.getMisses(); + } + + @Override + public long getTotalEvictions() { + return totalStats.getEvictions(); + } + + @Override + public long getTotalMemorySize() { + return totalStats.getMemorySize(); + } + + @Override + public long getTotalEntries() { + return totalStats.getEntries(); + } + + @Override + public long getHitsByDimensions(List dimensions) { + return getStatsByDimensions(dimensions).getHits(); + } + + @Override + public long getMissesByDimensions(List dimensions) { + return getStatsByDimensions(dimensions).getMisses(); + } + + @Override + public long getEvictionsByDimensions(List dimensions) { + return getStatsByDimensions(dimensions).getEvictions(); + } + + @Override + public long getMemorySizeByDimensions(List dimensions) { + return getStatsByDimensions(dimensions).getMemorySize(); + } + + @Override + public long getEntriesByDimensions(List dimensions) { + return getStatsByDimensions(dimensions).getEntries(); + } + + @Override + public void incrementHitsByDimensions(List dimensions) { + internalIncrement(dimensions, (response, amount) -> response.hits.inc(amount), 1); + } + + @Override + public void incrementMissesByDimensions(List dimensions) { + internalIncrement(dimensions, (response, amount) -> response.misses.inc(amount), 1); + } + + @Override + public void incrementEvictionsByDimensions(List dimensions) { + internalIncrement(dimensions, (response, amount) -> response.evictions.inc(amount), 1); + } + + @Override + public void incrementMemorySizeByDimensions(List dimensions, long amountBytes) { + internalIncrement(dimensions, (response, amount) -> response.memorySize.inc(amount), amountBytes); + } + + @Override + public void incrementEntriesByDimensions(List dimensions) { + internalIncrement(dimensions, (response, amount) -> response.entries.inc(amount), 1); + } + + @Override + public void decrementEntriesByDimensions(List dimensions) { + internalIncrement(dimensions, (response, amount) -> response.entries.inc(amount), -1); + } + + @Override + public void reset() { + for (Key key : map.keySet()) { + CacheStatsResponse response = map.get(key); + response.memorySize.dec(response.getMemorySize()); + response.entries.dec(response.getEntries()); + } + totalStats.memorySize.dec(totalStats.getMemorySize()); + totalStats.entries.dec(totalStats.getEntries()); + } + + private CacheStatsResponse internalGetStats(List dimensions) { + assert dimensions.size() == dimensionNames.size(); + CacheStatsResponse response = map.get(new Key(dimensions)); + if (response == null) { + if (map.size() < maxDimensionValues) { + response = new CacheStatsResponse(); + map.put(new Key(dimensions), response); + } else { + throw new RuntimeException("Cannot add new combination of dimension values to stats object; reached maximum"); + } + } + return response; + } + + private void internalIncrement(List dimensions, BiConsumer incrementer, long amount) { + CacheStatsResponse stats = internalGetStats(dimensions); + incrementer.accept(stats, amount); + incrementer.accept(totalStats, amount); + } + + /** + * Unmodifiable wrapper over a set of CacheStatsDimension. Pkg-private for testing. + */ + static class Key { + final Set dimensions; + + Key(Set dimensions) { + this.dimensions = Collections.unmodifiableSet(dimensions); + } + + Key(List dimensions) { + this(new HashSet<>(dimensions)); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null) { + return false; + } + if (o.getClass() != Key.class) { + return false; + } + Key other = (Key) o; + return this.dimensions.equals(other.dimensions); + } + + @Override + public int hashCode() { + return this.dimensions.hashCode(); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java index d218903de5b6d..e242d084ec2a7 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -12,9 +12,13 @@ import org.opensearch.common.cache.CacheBuilder; import org.opensearch.common.cache.CacheType; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.stats.CacheStats; +import org.opensearch.common.cache.stats.MultiDimensionCacheStats; import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; @@ -22,7 +26,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeValue; +import java.util.List; import java.util.Map; +import java.util.Objects; import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; @@ -33,13 +39,15 @@ * * @opensearch.experimental */ -public class OpenSearchOnHeapCache implements ICache, RemovalListener { - - private final Cache cache; - private final RemovalListener removalListener; +public class OpenSearchOnHeapCache implements ICache, RemovalListener, V> { + private final Cache, V> cache; + private CacheStats stats; + private final RemovalListener, V> removalListener; + private final List dimensionNames; + public static final String TIER_DIMENSION_VALUE = "on_heap"; public OpenSearchOnHeapCache(Builder builder) { - CacheBuilder cacheBuilder = CacheBuilder.builder() + CacheBuilder, V> cacheBuilder = CacheBuilder., V>builder() .setMaximumWeight(builder.getMaxWeightInBytes()) .weigher(builder.getWeigher()) .removalListener(this); @@ -47,44 +55,61 @@ public OpenSearchOnHeapCache(Builder builder) { cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess()); } cache = cacheBuilder.build(); + this.dimensionNames = Objects.requireNonNull(builder.dimensionNames, "Dimension names can't be null"); + this.stats = new MultiDimensionCacheStats(dimensionNames, TIER_DIMENSION_VALUE); this.removalListener = builder.getRemovalListener(); } @Override - public V get(K key) { + public V get(ICacheKey key) { V value = cache.get(key); + if (value != null) { + stats.incrementHitsByDimensions(key.dimensions); + } else { + stats.incrementMissesByDimensions(key.dimensions); + } return value; } @Override - public void put(K key, V value) { + public void put(ICacheKey key, V value) { cache.put(key, value); + stats.incrementEntriesByDimensions(key.dimensions); + stats.incrementMemorySizeByDimensions(key.dimensions, cache.getWeigher().applyAsLong(key, value)); } @Override - public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + public V computeIfAbsent(ICacheKey key, LoadAwareCacheLoader, V> loader) throws Exception { V value = cache.computeIfAbsent(key, key1 -> loader.load(key)); + if (!loader.isLoaded()) { + stats.incrementHitsByDimensions(key.dimensions); + } else { + stats.incrementMissesByDimensions(key.dimensions); + stats.incrementEntriesByDimensions(key.dimensions); + stats.incrementMemorySizeByDimensions(key.dimensions, cache.getWeigher().applyAsLong(key, value)); + } return value; } @Override - public void invalidate(K key) { + public void invalidate(ICacheKey key) { cache.invalidate(key); } @Override public void invalidateAll() { cache.invalidateAll(); + stats.reset(); } @Override - public Iterable keys() { + public Iterable> keys() { return cache.keys(); } @Override public long count() { - return cache.count(); + return stats.getTotalEntries(); } @Override @@ -96,8 +121,23 @@ public void refresh() { public void close() {} @Override - public void onRemoval(RemovalNotification notification) { - this.removalListener.onRemoval(notification); + public CacheStats stats() { + return stats; + } + + @Override + public void onRemoval(RemovalNotification, V> notification) { + removalListener.onRemoval(notification); + stats.decrementEntriesByDimensions(notification.getKey().dimensions); + stats.incrementMemorySizeByDimensions( + notification.getKey().dimensions, + -cache.getWeigher().applyAsLong(notification.getKey(), notification.getValue()) + ); + + if (RemovalReason.EVICTED.equals(notification.getRemovalReason()) + || RemovalReason.CAPACITY.equals(notification.getRemovalReason())) { + stats.incrementEvictionsByDimensions(notification.getKey().dimensions); + } } /** @@ -111,9 +151,11 @@ public static class OpenSearchOnHeapCacheFactory implements Factory { public ICache create(CacheConfig config, CacheType cacheType, Map cacheFactories) { Map> settingList = OpenSearchOnHeapCacheSettings.getSettingListForCacheType(cacheType); Settings settings = config.getSettings(); - return new Builder().setMaximumWeightInBytes( - ((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes() - ).setWeigher(config.getWeigher()).setRemovalListener(config.getRemovalListener()).build(); + return new Builder().setDimensionNames(config.getDimensionNames()) + .setMaximumWeightInBytes(((ByteSizeValue) settingList.get(MAXIMUM_SIZE_IN_BYTES_KEY).get(settings)).getBytes()) + .setWeigher(config.getWeigher()) + .setRemovalListener(config.getRemovalListener()) + .build(); } @Override @@ -129,6 +171,13 @@ public String getCacheName() { */ public static class Builder extends ICacheBuilder { + private List dimensionNames; + + public Builder setDimensionNames(List dimensionNames) { + this.dimensionNames = dimensionNames; + return this; + } + @Override public ICache build() { return new OpenSearchOnHeapCache(this); diff --git a/server/src/main/java/org/opensearch/common/cache/store/builders/ICacheBuilder.java b/server/src/main/java/org/opensearch/common/cache/store/builders/ICacheBuilder.java index 7ca9080ec1aa6..3fc43767a03e7 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/builders/ICacheBuilder.java +++ b/server/src/main/java/org/opensearch/common/cache/store/builders/ICacheBuilder.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -28,13 +29,13 @@ public abstract class ICacheBuilder { private long maxWeightInBytes; - private ToLongBiFunction weigher; + private ToLongBiFunction, V> weigher; private TimeValue expireAfterAcess; private Settings settings; - private RemovalListener removalListener; + private RemovalListener, V> removalListener; public ICacheBuilder() {} @@ -43,7 +44,7 @@ public ICacheBuilder setMaximumWeightInBytes(long sizeInBytes) { return this; } - public ICacheBuilder setWeigher(ToLongBiFunction weigher) { + public ICacheBuilder setWeigher(ToLongBiFunction, V> weigher) { this.weigher = weigher; return this; } @@ -53,13 +54,13 @@ public ICacheBuilder setExpireAfterAccess(TimeValue expireAfterAcess) { return this; } - public ICacheBuilder setSettings(Settings settings) { - this.settings = settings; + public ICacheBuilder setRemovalListener(RemovalListener, V> listener) { + this.removalListener = listener; return this; } - public ICacheBuilder setRemovalListener(RemovalListener removalListener) { - this.removalListener = removalListener; + public ICacheBuilder setSettings(Settings settings) { + this.settings = settings; return this; } @@ -71,12 +72,12 @@ public TimeValue getExpireAfterAcess() { return expireAfterAcess; } - public ToLongBiFunction getWeigher() { + public ToLongBiFunction, V> getWeigher() { return weigher; } - public RemovalListener getRemovalListener() { - return this.removalListener; + public RemovalListener, V> getRemovalListener() { + return removalListener; } public Settings getSettings() { diff --git a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java index 6fefea6578fb9..2c2a93ae9ab67 100644 --- a/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java +++ b/server/src/main/java/org/opensearch/common/cache/store/config/CacheConfig.java @@ -9,9 +9,12 @@ package org.opensearch.common.cache.store.config; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.settings.Settings; +import java.util.List; import java.util.function.ToLongBiFunction; /** @@ -37,9 +40,14 @@ public class CacheConfig { /** * Represents a function that calculates the size or weight of a key-value pair. */ - private final ToLongBiFunction weigher; + private final ToLongBiFunction, V> weigher; - private final RemovalListener removalListener; + private final RemovalListener, V> removalListener; + + private final Serializer keySerializer; + private final Serializer valueSerializer; + + private final List dimensionNames; private CacheConfig(Builder builder) { this.keyType = builder.keyType; @@ -47,6 +55,13 @@ private CacheConfig(Builder builder) { this.settings = builder.settings; this.removalListener = builder.removalListener; this.weigher = builder.weigher; + this.keySerializer = builder.keySerializer; + this.valueSerializer = builder.valueSerializer; + this.dimensionNames = builder.dimensionNames; + } + + public RemovalListener, V> getRemovalListener() { + return removalListener; } public Class getKeyType() { @@ -61,12 +76,20 @@ public Settings getSettings() { return settings; } - public RemovalListener getRemovalListener() { - return removalListener; + public ToLongBiFunction, V> getWeigher() { + return weigher; } - public ToLongBiFunction getWeigher() { - return weigher; + public Serializer getKeySerializer() { + return keySerializer; + } + + public Serializer getValueSerializer() { + return valueSerializer; + } + + public List getDimensionNames() { + return dimensionNames; } /** @@ -75,16 +98,20 @@ public ToLongBiFunction getWeigher() { * @param Type of value. */ public static class Builder { - private Settings settings; private Class keyType; private Class valueType; - private RemovalListener removalListener; + private RemovalListener, V> removalListener; + + private ToLongBiFunction, V> weigher; + + private Serializer keySerializer; + private Serializer valueSerializer; - private ToLongBiFunction weigher; + private List dimensionNames; public Builder() {} @@ -103,16 +130,31 @@ public Builder setValueType(Class valueType) { return this; } - public Builder setRemovalListener(RemovalListener removalListener) { - this.removalListener = removalListener; + public Builder setRemovalListener(RemovalListener, V> listener) { + this.removalListener = listener; return this; } - public Builder setWeigher(ToLongBiFunction weigher) { + public Builder setWeigher(ToLongBiFunction, V> weigher) { this.weigher = weigher; return this; } + public Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + + public Builder setDimensionNames(List dimensionNames) { + this.dimensionNames = dimensionNames; + return this; + } + public CacheConfig build() { return new CacheConfig<>(this); } diff --git a/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java new file mode 100644 index 0000000000000..b83957d4a2508 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.cache.serializer.Serializer; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; + +import java.io.IOException; +import java.util.Arrays; + +/** + * This class serializes the IndicesRequestCache.Key using its writeTo method. + */ +public class IRCKeyWriteableSerializer implements Serializer { + + public IRCKeyWriteableSerializer() {} + + @Override + public byte[] serialize(IndicesRequestCache.Key object) { + try { + BytesStreamOutput os = new BytesStreamOutput(); + object.writeTo(os); + return BytesReference.toBytes(os.bytes()); + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public IndicesRequestCache.Key deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + try { + BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length); + return new IndicesRequestCache.Key(is); + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public boolean equals(IndicesRequestCache.Key object, byte[] bytes) { + // Deserialization is much slower than serialization for keys of order 1 KB, + // while time to serialize is fairly constant (per byte) + if (bytes.length < 5000) { + return Arrays.equals(serialize(object), bytes); + } else { + return object.equals(deserialize(bytes)); + } + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/serializer/BytesReferenceSerializerTests.java b/server/src/test/java/org/opensearch/common/cache/serializer/BytesReferenceSerializerTests.java new file mode 100644 index 0000000000000..b1d9e762d5df7 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/serializer/BytesReferenceSerializerTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +import org.opensearch.common.Randomness; +import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; +import org.opensearch.core.common.util.ByteArray; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Random; + +public class BytesReferenceSerializerTests extends OpenSearchTestCase { + public void testEquality() throws Exception { + BytesReferenceSerializer ser = new BytesReferenceSerializer(); + // Test that values are equal before and after serialization, for each implementation of BytesReference. + byte[] bytesValue = new byte[1000]; + Random rand = Randomness.get(); + rand.nextBytes(bytesValue); + + BytesReference ba = new BytesArray(bytesValue); + byte[] serialized = ser.serialize(ba); + assertTrue(ser.equals(ba, serialized)); + BytesReference deserialized = ser.deserialize(serialized); + assertEquals(ba, deserialized); + + ba = new BytesArray(new byte[] {}); + serialized = ser.serialize(ba); + assertTrue(ser.equals(ba, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(ba, deserialized); + + BytesReference cbr = CompositeBytesReference.of(new BytesArray(bytesValue), new BytesArray(bytesValue)); + serialized = ser.serialize(cbr); + assertTrue(ser.equals(cbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(cbr, deserialized); + + // We need the PagedBytesReference to be larger than the page size (16 KB) in order to actually create it + byte[] pbrValue = new byte[PageCacheRecycler.PAGE_SIZE_IN_BYTES * 2]; + rand.nextBytes(pbrValue); + ByteArray arr = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(pbrValue.length); + arr.set(0L, pbrValue, 0, pbrValue.length); + assert !arr.hasArray(); + BytesReference pbr = BytesReference.fromByteArray(arr, pbrValue.length); + serialized = ser.serialize(pbr); + assertTrue(ser.equals(pbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(pbr, deserialized); + + BytesReference rbr = new ReleasableBytesReference(new BytesArray(bytesValue), ReleasableBytesReference.NO_OP); + serialized = ser.serialize(rbr); + assertTrue(ser.equals(rbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(rbr, deserialized); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/serializer/ICacheKeySerializerTests.java b/server/src/test/java/org/opensearch/common/cache/serializer/ICacheKeySerializerTests.java new file mode 100644 index 0000000000000..968d9dd64b01d --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/serializer/ICacheKeySerializerTests.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.serializer; + +import org.opensearch.common.Randomness; +import org.opensearch.common.cache.ICacheKey; +import org.opensearch.common.cache.stats.CacheStatsDimension; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +public class ICacheKeySerializerTests extends OpenSearchTestCase { + // For these tests, we use BytesReference as K, since we already have a Serializer implementation + public void testEquality() throws Exception { + BytesReferenceSerializer keySer = new BytesReferenceSerializer(); + ICacheKeySerializer serializer = new ICacheKeySerializer<>(keySer); + + int numDimensionsTested = 100; + for (int i = 0; i < numDimensionsTested; i++) { + CacheStatsDimension dim = getRandomDim(); + ICacheKey key = new ICacheKey<>(getRandomBytesReference(), List.of(dim)); + byte[] serialized = serializer.serialize(key); + assertTrue(serializer.equals(key, serialized)); + ICacheKey deserialized = serializer.deserialize(serialized); + assertEquals(key, deserialized); + assertTrue(serializer.equals(deserialized, serialized)); + } + } + + public void testDimNumbers() throws Exception { + BytesReferenceSerializer keySer = new BytesReferenceSerializer(); + ICacheKeySerializer serializer = new ICacheKeySerializer<>(keySer); + + for (int numDims : new int[] { 0, 5, 1000 }) { + List dims = new ArrayList<>(); + for (int j = 0; j < numDims; j++) { + dims.add(getRandomDim()); + } + ICacheKey key = new ICacheKey<>(getRandomBytesReference(), dims); + byte[] serialized = serializer.serialize(key); + assertTrue(serializer.equals(key, serialized)); + ICacheKey deserialized = serializer.deserialize(serialized); + assertEquals(key, deserialized); + } + } + + public void testHashCodes() throws Exception { + ICacheKey key1 = new ICacheKey<>("key", List.of(new CacheStatsDimension("dimension_name", "dimension_value"))); + ICacheKey key2 = new ICacheKey<>("key", List.of(new CacheStatsDimension("dimension_name", "dimension_value"))); + + assertEquals(key1, key2); + assertEquals(key1.hashCode(), key2.hashCode()); + } + + public void testNullInputs() throws Exception { + BytesReferenceSerializer keySer = new BytesReferenceSerializer(); + ICacheKeySerializer serializer = new ICacheKeySerializer<>(keySer); + + assertNull(serializer.deserialize(null)); + ICacheKey nullKey = new ICacheKey<>(null, List.of(getRandomDim())); + assertNull(serializer.serialize(nullKey)); + assertNull(serializer.serialize(null)); + assertNull(serializer.serialize(new ICacheKey<>(getRandomBytesReference(), null))); + } + + private CacheStatsDimension getRandomDim() { + return new CacheStatsDimension(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + + private BytesReference getRandomBytesReference() { + byte[] bytesValue = new byte[1000]; + Random rand = Randomness.get(); + rand.nextBytes(bytesValue); + return new BytesArray(bytesValue); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java index 9b821a3b2a9cb..9d39f8a43ea58 100644 --- a/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java +++ b/server/src/test/java/org/opensearch/common/cache/service/CacheServiceTests.java @@ -23,6 +23,8 @@ import java.util.Map; import static junit.framework.TestCase.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,7 +44,7 @@ public void testWithCreateCacheForIndicesRequestCacheType() { ); CacheConfig config = mock(CacheConfig.class); ICache onHeapCache = mock(OpenSearchOnHeapCache.class); - when(factory1.create(config, CacheType.INDICES_REQUEST_CACHE, factoryMap)).thenReturn(onHeapCache); + when(factory1.create(eq(config), eq(CacheType.INDICES_REQUEST_CACHE), any(Map.class))).thenReturn(onHeapCache); CacheService cacheService = cacheModule.getCacheService(); ICache ircCache = cacheService.createCache(config, CacheType.INDICES_REQUEST_CACHE); diff --git a/server/src/test/java/org/opensearch/common/cache/stats/CacheStatsDimensionTests.java b/server/src/test/java/org/opensearch/common/cache/stats/CacheStatsDimensionTests.java new file mode 100644 index 0000000000000..21c0c46991be5 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/stats/CacheStatsDimensionTests.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.test.OpenSearchTestCase; + +public class CacheStatsDimensionTests extends OpenSearchTestCase { + public void testSerialization() throws Exception { + String name = "dimension_name"; + String value = "dimension_value"; + CacheStatsDimension dim = new CacheStatsDimension(name, value); + + BytesStreamOutput os = new BytesStreamOutput(); + dim.writeTo(os); + BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); + CacheStatsDimension deserialized = new CacheStatsDimension(is); + + assertEquals(dim.dimensionName, deserialized.dimensionName); + assertEquals(dim.dimensionValue, deserialized.dimensionValue); + assertEquals(dim, deserialized); + } + + public void testEquality() throws Exception { + String name = "dimension_name"; + String value = "dimension_value"; + CacheStatsDimension dim = new CacheStatsDimension(name, value); + assertEquals(dim, new CacheStatsDimension(name, value)); + assertNotEquals(dim, new CacheStatsDimension("a", "b")); + assertNotEquals(dim, null); + assertNotEquals(dim, new CacheStatsDimension(null, null)); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/stats/MultiDimensionCacheStatsTests.java b/server/src/test/java/org/opensearch/common/cache/stats/MultiDimensionCacheStatsTests.java new file mode 100644 index 0000000000000..63f747d63ff08 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/stats/MultiDimensionCacheStatsTests.java @@ -0,0 +1,305 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.stats; + +import org.opensearch.common.Randomness; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +public class MultiDimensionCacheStatsTests extends OpenSearchTestCase { + String tierDimensionValue = "tier"; + + public void testSerialization() throws Exception { + List dimensionNames = List.of("dim1", "dim2"); + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(dimensionNames, tierDimensionValue); + Map> usedDimensionValues = getUsedDimensionValues(stats, 10); + populateStats(stats, usedDimensionValues, 100, 10); + + BytesStreamOutput os = new BytesStreamOutput(); + stats.writeTo(os); + BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); + MultiDimensionCacheStats deserialized = new MultiDimensionCacheStats(is); + assertEquals(stats.map, deserialized.map); + assertEquals(stats.totalStats, deserialized.totalStats); + assertEquals(stats.dimensionNames, deserialized.dimensionNames); + } + + public void testAddAndGet() throws Exception { + List dimensionNames = List.of("dim1", "dim2", "dim3", "dim4"); + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(dimensionNames, tierDimensionValue); + Map> usedDimensionValues = getUsedDimensionValues(stats, 10); + + Map, CacheStatsResponse> expected = populateStats(stats, usedDimensionValues, 1000, 10); + // test gets for each distinct combination of values + for (Set dimSet : expected.keySet()) { + List dims = new ArrayList<>(dimSet); + CacheStatsResponse expectedResponse = expected.get(dimSet); + CacheStatsResponse actual = stats.getStatsByDimensions(dims); + assertEquals(expectedResponse, actual); + + assertEquals(expectedResponse.getHits(), stats.getHitsByDimensions(dims)); + assertEquals(expectedResponse.getMisses(), stats.getMissesByDimensions(dims)); + assertEquals(expectedResponse.getEvictions(), stats.getEvictionsByDimensions(dims)); + assertEquals(expectedResponse.getMemorySize(), stats.getMemorySizeByDimensions(dims)); + assertEquals(expectedResponse.getEntries(), stats.getEntriesByDimensions(dims)); + } + + // test gets for aggregations of values: for example, dim1="a", dim2="b", but dim3 and dim4 can be anything + // test a random subset of these, there are combinatorially many possibilities + for (int i = 0; i < 1000; i++) { + List aggregationDims = getRandomDimList( + stats.dimensionNames, + usedDimensionValues, + false, + Randomness.get() + ); + CacheStatsResponse expectedResponse = new CacheStatsResponse(); + for (Set dimSet : expected.keySet()) { + if (dimSet.containsAll(aggregationDims)) { + // Confirmed via debug we get a reasonable number of matching dimensions with this setup + expectedResponse.add(expected.get(dimSet)); + } + } + assertEquals(expectedResponse, stats.getStatsByDimensions(aggregationDims)); + + assertEquals(expectedResponse.getHits(), stats.getHitsByDimensions(aggregationDims)); + assertEquals(expectedResponse.getMisses(), stats.getMissesByDimensions(aggregationDims)); + assertEquals(expectedResponse.getEvictions(), stats.getEvictionsByDimensions(aggregationDims)); + assertEquals(expectedResponse.getMemorySize(), stats.getMemorySizeByDimensions(aggregationDims)); + assertEquals(expectedResponse.getEntries(), stats.getEntriesByDimensions(aggregationDims)); + } + + // test gets for total + + CacheStatsResponse expectedTotal = new CacheStatsResponse(); + for (Set dimSet : expected.keySet()) { + expectedTotal.add(expected.get(dimSet)); + } + assertEquals(expectedTotal, stats.getTotalStats()); + + assertEquals(expectedTotal.getHits(), stats.getTotalHits()); + assertEquals(expectedTotal.getMisses(), stats.getTotalMisses()); + assertEquals(expectedTotal.getEvictions(), stats.getTotalEvictions()); + assertEquals(expectedTotal.getMemorySize(), stats.getTotalMemorySize()); + assertEquals(expectedTotal.getEntries(), stats.getTotalEntries()); + } + + public void testExceedsCap() throws Exception { + List dimensionNames = List.of("dim1", "dim2", "dim3", "dim4"); + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(dimensionNames, tierDimensionValue, 1000); + Map> usedDimensionValues = getUsedDimensionValues(stats, 100); + + // Try a few more than MAX_DIMENSION_VALUES times because there can be collisions in the randomly selected dimension values + assertThrows(RuntimeException.class, () -> populateStats(stats, usedDimensionValues, (int) (stats.maxDimensionValues * 1.1), 10)); + } + + public void testEmptyDimsList() throws Exception { + // If the dimension list is empty, the map should have only one entry, from the empty set -> the total stats. + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(List.of(), tierDimensionValue); + Map> usedDimensionValues = getUsedDimensionValues(stats, 100); + populateStats(stats, usedDimensionValues, 10, 100); + assertEquals(stats.totalStats, stats.getStatsByDimensions(List.of())); + assertEquals(stats.getTotalHits(), stats.getHitsByDimensions(List.of())); + assertEquals(stats.getTotalMisses(), stats.getMissesByDimensions(List.of())); + assertEquals(stats.getTotalEvictions(), stats.getEvictionsByDimensions(List.of())); + assertEquals(stats.getTotalMemorySize(), stats.getMemorySizeByDimensions(List.of())); + assertEquals(stats.getTotalEntries(), stats.getEntriesByDimensions(List.of())); + assertEquals(1, stats.map.size()); + } + + public void testTierLogic() throws Exception { + List dimensionNames = List.of("dim1", "dim2", "dim3", "dim4"); + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(dimensionNames, tierDimensionValue); + Map> usedDimensionValues = getUsedDimensionValues(stats, 10); + Map, CacheStatsResponse> expected = populateStats(stats, usedDimensionValues, 1000, 10); + + CacheStatsDimension tierDim = new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, tierDimensionValue); + CacheStatsDimension wrongTierDim = new CacheStatsDimension(CacheStatsDimension.TIER_DIMENSION_NAME, "wrong_value"); + + for (int i = 0; i < 1000; i++) { + List aggregationDims = getRandomDimList( + stats.dimensionNames, + usedDimensionValues, + false, + Randomness.get() + ); + List aggDimsWithTier = new ArrayList<>(aggregationDims); + aggDimsWithTier.add(tierDim); + + List aggDimsWithWrongTier = new ArrayList<>(aggregationDims); + aggDimsWithWrongTier.add(wrongTierDim); + CacheStatsResponse expectedResponse = new CacheStatsResponse(); + for (Set dimSet : expected.keySet()) { + if (dimSet.containsAll(aggregationDims)) { + expectedResponse.add(expected.get(dimSet)); + } + } + assertEquals(expectedResponse, stats.getStatsByDimensions(aggregationDims)); + assertEquals(expectedResponse, stats.getStatsByDimensions(aggDimsWithTier)); + assertEquals(new CacheStatsResponse(), stats.getStatsByDimensions(aggDimsWithWrongTier)); + } + assertEquals(stats.getTotalStats(), stats.getStatsByDimensions(List.of(tierDim))); + assertEquals(new CacheStatsResponse(), stats.getStatsByDimensions(List.of(wrongTierDim))); + } + + public void testKeyEquality() throws Exception { + Set dims1 = new HashSet<>(); + dims1.add(new CacheStatsDimension("a", "1")); + dims1.add(new CacheStatsDimension("b", "2")); + dims1.add(new CacheStatsDimension("c", "3")); + MultiDimensionCacheStats.Key key1 = new MultiDimensionCacheStats.Key(dims1); + + List dims2 = new ArrayList<>(); + dims2.add(new CacheStatsDimension("c", "3")); + dims2.add(new CacheStatsDimension("a", "1")); + dims2.add(new CacheStatsDimension("b", "2")); + MultiDimensionCacheStats.Key key2 = new MultiDimensionCacheStats.Key(dims2); + + assertEquals(key1, key2); + assertEquals(key1.hashCode(), key2.hashCode()); + } + + public void testReset() throws Exception { + List dimensionNames = List.of("dim1", "dim2"); + MultiDimensionCacheStats stats = new MultiDimensionCacheStats(dimensionNames, tierDimensionValue); + Map> usedDimensionValues = getUsedDimensionValues(stats, 10); + Map, CacheStatsResponse> expected = populateStats(stats, usedDimensionValues, 100, 10); + + stats.reset(); + + for (Set dimSet : expected.keySet()) { + List dims = new ArrayList<>(dimSet); + CacheStatsResponse originalResponse = expected.get(dimSet); + originalResponse.memorySize = new CounterMetric(); + originalResponse.entries = new CounterMetric(); + CacheStatsResponse actual = stats.getStatsByDimensions(dims); + assertEquals(originalResponse, actual); + + assertEquals(originalResponse.getHits(), stats.getHitsByDimensions(dims)); + assertEquals(originalResponse.getMisses(), stats.getMissesByDimensions(dims)); + assertEquals(originalResponse.getEvictions(), stats.getEvictionsByDimensions(dims)); + assertEquals(originalResponse.getMemorySize(), stats.getMemorySizeByDimensions(dims)); + assertEquals(originalResponse.getEntries(), stats.getEntriesByDimensions(dims)); + } + + CacheStatsResponse expectedTotal = new CacheStatsResponse(); + for (Set dimSet : expected.keySet()) { + expectedTotal.add(expected.get(dimSet)); + } + expectedTotal.memorySize = new CounterMetric(); + expectedTotal.entries = new CounterMetric(); + assertEquals(expectedTotal, stats.getTotalStats()); + + assertEquals(expectedTotal.getHits(), stats.getTotalHits()); + assertEquals(expectedTotal.getMisses(), stats.getTotalMisses()); + assertEquals(expectedTotal.getEvictions(), stats.getTotalEvictions()); + assertEquals(expectedTotal.getMemorySize(), stats.getTotalMemorySize()); + assertEquals(expectedTotal.getEntries(), stats.getTotalEntries()); + } + + private Map> getUsedDimensionValues(MultiDimensionCacheStats stats, int numValuesPerDim) { + Map> usedDimensionValues = new HashMap<>(); + for (int i = 0; i < stats.dimensionNames.size(); i++) { + List values = new ArrayList<>(); + for (int j = 0; j < numValuesPerDim; j++) { + values.add(UUID.randomUUID().toString()); + } + usedDimensionValues.put(stats.dimensionNames.get(i), values); + } + return usedDimensionValues; + } + + private Map, CacheStatsResponse> populateStats( + MultiDimensionCacheStats stats, + Map> usedDimensionValues, + int numDistinctValuePairs, + int numRepetitionsPerValue + ) { + Map, CacheStatsResponse> expected = new HashMap<>(); + + Random rand = Randomness.get(); + for (int i = 0; i < numDistinctValuePairs; i++) { + List dimensions = getRandomDimList(stats.dimensionNames, usedDimensionValues, true, rand); + Set dimSet = new HashSet<>(dimensions); + if (expected.get(dimSet) == null) { + expected.put(dimSet, new CacheStatsResponse()); + } + + for (int j = 0; j < numRepetitionsPerValue; j++) { + + int numHitIncrements = rand.nextInt(10); + for (int k = 0; k < numHitIncrements; k++) { + stats.incrementHitsByDimensions(dimensions); + expected.get(new HashSet<>(dimensions)).hits.inc(); + } + + int numMissIncrements = rand.nextInt(10); + for (int k = 0; k < numMissIncrements; k++) { + stats.incrementMissesByDimensions(dimensions); + expected.get(new HashSet<>(dimensions)).misses.inc(); + } + + int numEvictionIncrements = rand.nextInt(10); + for (int k = 0; k < numEvictionIncrements; k++) { + stats.incrementEvictionsByDimensions(dimensions); + expected.get(new HashSet<>(dimensions)).evictions.inc(); + } + + int numMemorySizeIncrements = rand.nextInt(10); + for (int k = 0; k < numMemorySizeIncrements; k++) { + long memIncrementAmount = rand.nextInt(5000); + stats.incrementMemorySizeByDimensions(dimensions, memIncrementAmount); + expected.get(new HashSet<>(dimensions)).memorySize.inc(memIncrementAmount); + } + + int numEntryIncrements = rand.nextInt(9) + 1; + for (int k = 0; k < numEntryIncrements; k++) { + stats.incrementEntriesByDimensions(dimensions); + expected.get(new HashSet<>(dimensions)).entries.inc(); + } + + int numEntryDecrements = rand.nextInt(numEntryIncrements); + for (int k = 0; k < numEntryDecrements; k++) { + stats.decrementEntriesByDimensions(dimensions); + expected.get(new HashSet<>(dimensions)).entries.dec(); + } + } + } + return expected; + } + + private List getRandomDimList( + List dimensionNames, + Map> usedDimensionValues, + boolean pickValueForAllDims, + Random rand + ) { + List result = new ArrayList<>(); + for (String dimName : dimensionNames) { + if (pickValueForAllDims || rand.nextBoolean()) { // if pickValueForAllDims, always pick a value for each dimension, otherwise do + // so 50% of the time + int index = between(0, usedDimensionValues.get(dimName).size() - 1); + result.add(new CacheStatsDimension(dimName, usedDimensionValues.get(dimName).get(index))); + } + } + return result; + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/store/OpenSearchOnHeapCacheTests.java b/server/src/test/java/org/opensearch/common/cache/store/OpenSearchOnHeapCacheTests.java new file mode 100644 index 0000000000000..b02195b67437d --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/store/OpenSearchOnHeapCacheTests.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.stats.CacheStatsDimension; +import org.opensearch.common.cache.store.config.CacheConfig; +import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY; + +public class OpenSearchOnHeapCacheTests extends OpenSearchTestCase { + private final static long keyValueSize = 50; + private final static List dimensionNames = List.of("dim1", "dim2"); + + public void testStats() throws Exception { + MockRemovalListener listener = new MockRemovalListener<>(); + int maxKeys = between(10, 50); + int numEvicted = between(10, 20); + OpenSearchOnHeapCache cache = getCache(maxKeys, listener); + + List> keysAdded = new ArrayList<>(); + int numAdded = maxKeys + numEvicted; + for (int i = 0; i < numAdded; i++) { + ICacheKey key = getICacheKey(UUID.randomUUID().toString()); + keysAdded.add(key); + cache.computeIfAbsent(key, getLoadAwareCacheLoader()); + + assertEquals(i + 1, cache.stats().getTotalMisses()); + assertEquals(0, cache.stats().getTotalHits()); + assertEquals(Math.min(maxKeys, i + 1), cache.stats().getTotalEntries()); + assertEquals(Math.min(maxKeys, i + 1) * keyValueSize, cache.stats().getTotalMemorySize()); + assertEquals(Math.max(0, i + 1 - maxKeys), cache.stats().getTotalEvictions()); + } + // do gets from the last part of the list, which should be hits + for (int i = numAdded - maxKeys; i < numAdded; i++) { + cache.computeIfAbsent(keysAdded.get(i), getLoadAwareCacheLoader()); + int numHits = i + 1 - (numAdded - maxKeys); + + assertEquals(numAdded, cache.stats().getTotalMisses()); + assertEquals(numHits, cache.stats().getTotalHits()); + assertEquals(maxKeys, cache.stats().getTotalEntries()); + assertEquals(maxKeys * keyValueSize, cache.stats().getTotalMemorySize()); + assertEquals(numEvicted, cache.stats().getTotalEvictions()); + } + + // invalidate keys + for (int i = numAdded - maxKeys; i < numAdded; i++) { + cache.invalidate(keysAdded.get(i)); + int numInvalidated = i + 1 - (numAdded - maxKeys); + + assertEquals(numAdded, cache.stats().getTotalMisses()); + assertEquals(maxKeys, cache.stats().getTotalHits()); + assertEquals(maxKeys - numInvalidated, cache.stats().getTotalEntries()); + assertEquals((maxKeys - numInvalidated) * keyValueSize, cache.stats().getTotalMemorySize()); + assertEquals(numEvicted, cache.stats().getTotalEvictions()); + } + } + + private OpenSearchOnHeapCache getCache(int maxSizeKeys, MockRemovalListener listener) { + ICache.Factory onHeapCacheFactory = new OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory(); + Settings settings = Settings.builder() + .put( + OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE) + .get(MAXIMUM_SIZE_IN_BYTES_KEY) + .getKey(), + maxSizeKeys * keyValueSize + "b" + ) + .build(); + + CacheConfig cacheConfig = new CacheConfig.Builder().setKeyType(String.class) + .setValueType(String.class) + .setWeigher((k, v) -> keyValueSize) + .setRemovalListener(listener) + .setSettings(settings) + .setDimensionNames(dimensionNames) + .build(); + return (OpenSearchOnHeapCache) onHeapCacheFactory.create(cacheConfig, CacheType.INDICES_REQUEST_CACHE, null); + } + + private static class MockRemovalListener implements RemovalListener, V> { + CounterMetric numRemovals; + + MockRemovalListener() { + numRemovals = new CounterMetric(); + } + + @Override + public void onRemoval(RemovalNotification, V> notification) { + numRemovals.inc(); + } + } + + private ICacheKey getICacheKey(String key) { + List dims = new ArrayList<>(); + for (String dimName : dimensionNames) { + dims.add(new CacheStatsDimension(dimName, "0")); + } + return new ICacheKey<>(key, dims); + } + + private LoadAwareCacheLoader, String> getLoadAwareCacheLoader() { + return new LoadAwareCacheLoader<>() { + boolean isLoaded = false; + + @Override + public String load(ICacheKey key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } +} diff --git a/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java new file mode 100644 index 0000000000000..af657dadd7a1a --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.indices; + +import org.opensearch.common.Randomness; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Random; +import java.util.UUID; + +public class IRCKeyWriteableSerializerTests extends OpenSearchSingleNodeTestCase { + + public void testSerializer() throws Exception { + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShardOrNull(0); + IRCKeyWriteableSerializer ser = new IRCKeyWriteableSerializer(); + + int NUM_KEYS = 1000; + int[] valueLengths = new int[] { 1000, 6000 }; // test both branches in equals() + Random rand = Randomness.get(); + for (int valueLength : valueLengths) { + for (int i = 0; i < NUM_KEYS; i++) { + IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, indexShard.shardId()); + byte[] serialized = ser.serialize(key); + assertTrue(ser.equals(key, serialized)); + IndicesRequestCache.Key deserialized = ser.deserialize(serialized); + assertTrue(key.equals(deserialized)); + } + } + } + + private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, ShardId shard) { + byte[] value = new byte[valueLength]; + for (int i = 0; i < valueLength; i++) { + value[i] = (byte) (random.nextInt(126 - 32) + 32); + } + BytesReference keyValue = new BytesArray(value); + return new IndicesRequestCache.Key(shard, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key + } +}