diff --git a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java index 60613d4e2a997..e07aca89339b9 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java +++ b/libs/core/src/main/java/org/opensearch/core/common/bytes/BytesReference.java @@ -55,7 +55,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public interface BytesReference extends Comparable, ToXContentFragment, Serializable { // another lie! +public interface BytesReference extends Comparable, ToXContentFragment { /** * Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder, diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java index 85596929cfd6b..8c0dc0936f9dc 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -10,6 +10,8 @@ import org.opensearch.common.cache.RemovalListener; +import java.io.IOException; + /** * asdsadssa * @param @@ -17,13 +19,13 @@ */ public interface CachingTier { - V get(K key); + V get(K key) throws IOException; - void put(K key, V value); + void put(K key, V value) throws IOException; V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception; - void invalidate(K key); + void invalidate(K key) throws IOException; V compute(K key, TieredCacheLoader loader) throws Exception; diff --git a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java index efd9a459cd338..de88c44cc1e8f 100644 --- a/server/src/main/java/org/opensearch/indices/DiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/DiskCachingTier.java @@ -9,5 +9,8 @@ package org.opensearch.indices; public interface DiskCachingTier extends CachingTier { - + /** + * Closes the disk tier. + */ + void close(); } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 2142b8f3b0930..4a34fea847981 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -27,16 +27,24 @@ import org.ehcache.Cache; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.CounterMetric; - +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Collections; -public class EhcacheDiskCachingTier implements CachingTier, RemovalListener { +public class EhcacheDiskCachingTier implements DiskCachingTier, RemovalListener { private final PersistentCacheManager cacheManager; - private final Cache cache; // make private after debug - - private final Class keyType; // I think these are needed to pass to newCacheConfigurationBuilder + private final Cache cache; + private final Class keyType; // These are needed to pass to newCacheConfigurationBuilder + //private final Class> ehcacheKeyType; private final Class valueType; public final static String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value private RemovalListener removalListener; @@ -48,17 +56,25 @@ public class EhcacheDiskCachingTier implements CachingTier, RemovalL private static final String cacheAlias = "diskTier"; private final boolean isPersistent; private CounterMetric count; // number of entries in cache - private EhcacheEventListener listener; + private final EhcacheEventListener listener; // private RBMIntKeyLookupStore keystore; // private CacheTierPolicy[] policies; // private IndicesRequestCacheDiskTierPolicy policy; - public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long maxKeystoreWeightInBytes, Class keyType, Class valueType) { + public EhcacheDiskCachingTier( + boolean isPersistent, + long maxWeightInBytes, + long maxKeystoreWeightInBytes, + Class keyType, + //Class> ehcacheKeyType, + Class valueType + ) { + this.isPersistent = isPersistent; this.keyType = keyType; + //this.ehcacheKeyType = ehcacheKeyType; this.valueType = valueType; - this.isPersistent = isPersistent; this.count = new CounterMetric(); - this.listener = new EhcacheEventListener(this, this.count); + this.listener = new EhcacheEventListener(this, this); statsService = new DefaultStatisticsService(); // our EhcacheEventListener should receive events every time an entry is changed @@ -80,10 +96,10 @@ public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long .using(threadConfig) .with(CacheManagerBuilder.persistence(DISK_CACHE_FP)) .withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( - keyType, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent)) + EhcacheKey.class, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent)) .withService(listenerConfig) ).build(true); - this.cache = cacheManager.getCache(cacheAlias, keyType, valueType); + this.cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, valueType); this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10); // this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes); @@ -92,12 +108,12 @@ public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long } @Override - public V get(K key) { + public V get(K key) throws IOException { // I don't think we need to do the future stuff as the cache is threadsafe // if (keystore.contains(key.hashCode()) { long now = System.nanoTime(); - V value = cache.get(key); + V value = cache.get(new EhcacheKey(key)); double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000; getTimeMillisEWMA.addValue(tookTimeMillis); return value; @@ -106,12 +122,12 @@ public V get(K key) { } @Override - public void put(K key, V value) { + public void put(K key, V value) throws IOException { // No need to get old value, this is handled by EhcacheEventListener. // CheckDataResult policyResult = policy.checkData(value) // if (policyResult.isAccepted()) { - cache.put(key, value); + cache.put(new EhcacheKey(key), value); // keystore.add(key.hashCode()); // else { do something with policyResult.deniedReason()? } // } @@ -123,12 +139,12 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception } @Override - public void invalidate(K key) { + public void invalidate(K key) throws IOException { // keep keystore check to avoid unneeded disk seek // RemovalNotification is handled by EhcacheEventListener // if (keystore.contains(key.hashCode()) { - cache.remove(key); + cache.remove(new EhcacheKey(key)); // keystore.remove(key.hashCode()); // } } @@ -185,6 +201,15 @@ public boolean isPersistent() { return isPersistent; } + public K convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException { + BytesStreamInput is = new BytesStreamInput(); + byte[] bytes = eKey.getBytes(); + is.readBytes(bytes, 0, bytes.length); + // we somehow have to use the Reader thing in the Writeable interface + // otherwise its not generic + } + + @Override public void close() { // Call this method after each test, otherwise the directory will stay locked and you won't be able to // initialize another IndicesRequestCache (for example in the next test that runs) diff --git a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java index c6d2790dda95f..6c355dba025e1 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java @@ -15,33 +15,34 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.indices.EhcacheDiskCachingTier; // moved to another file for testing flexibility purposes -public class EhcacheEventListener implements CacheEventListener { // make it private after debugging +public class EhcacheEventListener implements CacheEventListener { + // Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference) + // to send removal notifications private RemovalListener removalListener; - private CounterMetric counter; - EhcacheEventListener(RemovalListener removalListener, CounterMetric counter) { + private EhcacheDiskCachingTier tier; + EhcacheEventListener(RemovalListener removalListener, EhcacheDiskCachingTier tier) { this.removalListener = removalListener; - this.counter = counter; // needed to handle count changes + this.tier = tier; // needed to handle count changes } @Override - public void onEvent(CacheEvent event) { - K key = event.getKey(); + public void onEvent(CacheEvent event) { + EhcacheKey ehcacheKey = event.getKey(); V oldValue = event.getOldValue(); V newValue = event.getNewValue(); EventType eventType = event.getType(); - System.out.println("I am eventing!!"); - // handle changing count for the disk tier if (oldValue == null && newValue != null) { - counter.inc(); + tier.countInc(); } else if (oldValue != null && newValue == null) { - counter.dec(); - } else { - int j; // breakpoint + tier.countDec(); } // handle creating a RemovalReason, unless eventType is CREATED @@ -63,6 +64,10 @@ public void onEvent(CacheEvent event) { default: reason = null; } - removalListener.onRemoval(new RemovalNotification(key, oldValue, reason)); + try { + K key = tier.convertEhcacheKeyToOriginal(ehcacheKey); + removalListener.onRemoval(new RemovalNotification(key, oldValue, reason)); + } catch (Exception ignored) {} + } } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheKey.java b/server/src/main/java/org/opensearch/indices/EhcacheKey.java new file mode 100644 index 0000000000000..b0a8d660b98f6 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/EhcacheKey.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +public class EhcacheKey implements Serializable { + // the IndicesRequestCache.Key is not Serializable, but it is Writeable. + // We use the output stream's bytes in this wrapper class and implement the appropriate interfaces/methods. + // Unfortunately it's not possible to define this class as EhcacheKey and use that as ehcache keys, + // because of type erasure. However, the only context EhcacheKey objects would be compared to one another + // is when they are used for the same cache, so they will always refer to the same K. + private byte[] bytes; + + public EhcacheKey(Writeable key) throws IOException { + BytesStreamOutput os = new BytesStreamOutput(); // Should we pass in an expected size? If so, how big? + key.writeTo(os); + this.bytes = BytesReference.toBytes(os.bytes()); + } + + public byte[] getBytes() { + return this.bytes; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof EhcacheKey)) { + return false; + } + EhcacheKey other = (EhcacheKey) o; + return Arrays.equals(this.bytes, other.bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(this.bytes); + } +} diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index 589c7dcfd9887..c240162423ee5 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -11,6 +11,7 @@ import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; +import org.opensearch.core.common.io.stream.Writeable; import java.util.Arrays; import java.util.List; @@ -22,10 +23,10 @@ * @param * @param */ -public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { +public class TieredCacheSpilloverStrategyHandler implements TieredCacheHandler, RemovalListener { private final OnHeapCachingTier onHeapCachingTier; - private final EhcacheDiskCachingTier diskCachingTier; // changed for testing + private final DiskCachingTier diskCachingTier; // changed for testing private final TieredCacheEventListener tieredCacheEventListener; /** @@ -109,7 +110,7 @@ public CachingTier getOnHeapCachingTier() { return this.onHeapCachingTier; } - public EhcacheDiskCachingTier getDiskCachingTier() { // change to CachingTier after debug + public DiskCachingTier getDiskCachingTier() { // change to CachingTier after debug return this.diskCachingTier; } @@ -147,7 +148,7 @@ public static class CacheValue { } } - public static class Builder { + public static class Builder { private OnHeapCachingTier onHeapCachingTier; private CachingTier diskCachingTier; private TieredCacheEventListener tieredCacheEventListener;