Skip to content

Commit

Permalink
Part 1 of modifying ehcache to take serializable Key
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Alfonsi committed Oct 16, 2023
1 parent ec729d0 commit 990ba9c
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment, Serializable { // another lie!
public interface BytesReference extends Comparable<BytesReference>, ToXContentFragment {

/**
* Convert an {@link XContentBuilder} into a BytesReference. This method closes the builder,
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/org/opensearch/indices/CachingTier.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@

import org.opensearch.common.cache.RemovalListener;

import java.io.IOException;

/**
* asdsadssa
* @param <K>
* @param <V>
*/
public interface CachingTier<K, V> {

V get(K key);
V get(K key) throws IOException;

void put(K key, V value);
void put(K key, V value) throws IOException;

V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception;

void invalidate(K key);
void invalidate(K key) throws IOException;

V compute(K key, TieredCacheLoader<K, V> loader) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,8 @@
package org.opensearch.indices;

public interface DiskCachingTier<K, V> extends CachingTier<K, V> {

/**
* Closes the disk tier.
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import org.ehcache.Cache;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.metrics.CounterMetric;

import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;

public class EhcacheDiskCachingTier<K, V> implements CachingTier<K, V>, RemovalListener<K, V> {
public class EhcacheDiskCachingTier<K extends Writeable, V> implements DiskCachingTier<K, V>, RemovalListener<K, V> {

private final PersistentCacheManager cacheManager;
private final Cache<K, V> cache; // make private after debug

private final Class<K> keyType; // I think these are needed to pass to newCacheConfigurationBuilder
private final Cache<EhcacheKey, V> cache;
private final Class<K> keyType; // These are needed to pass to newCacheConfigurationBuilder
//private final Class<EhcacheKey<K>> ehcacheKeyType;
private final Class<V> valueType;
public final static String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value
private RemovalListener<K, V> removalListener;
Expand All @@ -48,17 +56,25 @@ public class EhcacheDiskCachingTier<K, V> implements CachingTier<K, V>, RemovalL
private static final String cacheAlias = "diskTier";
private final boolean isPersistent;
private CounterMetric count; // number of entries in cache
private EhcacheEventListener listener;
private final EhcacheEventListener<K, V> listener;
// private RBMIntKeyLookupStore keystore;
// private CacheTierPolicy[] policies;
// private IndicesRequestCacheDiskTierPolicy policy;

public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long maxKeystoreWeightInBytes, Class<K> keyType, Class<V> valueType) {
public EhcacheDiskCachingTier(
boolean isPersistent,
long maxWeightInBytes,
long maxKeystoreWeightInBytes,
Class<K> keyType,
//Class<EhcacheKey<K>> ehcacheKeyType,
Class<V> valueType
) {
this.isPersistent = isPersistent;
this.keyType = keyType;
//this.ehcacheKeyType = ehcacheKeyType;
this.valueType = valueType;
this.isPersistent = isPersistent;
this.count = new CounterMetric();
this.listener = new EhcacheEventListener<K, V>(this, this.count);
this.listener = new EhcacheEventListener<K, V>(this, this);
statsService = new DefaultStatisticsService();

// our EhcacheEventListener should receive events every time an entry is changed
Expand All @@ -80,10 +96,10 @@ public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long
.using(threadConfig)
.with(CacheManagerBuilder.persistence(DISK_CACHE_FP))
.withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder(
keyType, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent))
EhcacheKey.class, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, isPersistent))
.withService(listenerConfig)
).build(true);
this.cache = cacheManager.getCache(cacheAlias, keyType, valueType);
this.cache = cacheManager.getCache(cacheAlias, EhcacheKey.class, valueType);
this.getTimeMillisEWMA = new ExponentiallyWeightedMovingAverage(GET_TIME_EWMA_ALPHA, 10);

// this.keystore = new RBMIntKeyLookupStore((int) Math.pow(2, 28), maxKeystoreWeightInBytes);
Expand All @@ -92,12 +108,12 @@ public EhcacheDiskCachingTier(boolean isPersistent, long maxWeightInBytes, long
}

@Override
public V get(K key) {
public V get(K key) throws IOException {
// I don't think we need to do the future stuff as the cache is threadsafe

// if (keystore.contains(key.hashCode()) {
long now = System.nanoTime();
V value = cache.get(key);
V value = cache.get(new EhcacheKey(key));
double tookTimeMillis = ((double) (System.nanoTime() - now)) / 1000000;
getTimeMillisEWMA.addValue(tookTimeMillis);
return value;
Expand All @@ -106,12 +122,12 @@ public V get(K key) {
}

@Override
public void put(K key, V value) {
public void put(K key, V value) throws IOException {
// No need to get old value, this is handled by EhcacheEventListener.

// CheckDataResult policyResult = policy.checkData(value)
// if (policyResult.isAccepted()) {
cache.put(key, value);
cache.put(new EhcacheKey(key), value);
// keystore.add(key.hashCode());
// else { do something with policyResult.deniedReason()? }
// }
Expand All @@ -123,12 +139,12 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception
}

@Override
public void invalidate(K key) {
public void invalidate(K key) throws IOException {
// keep keystore check to avoid unneeded disk seek
// RemovalNotification is handled by EhcacheEventListener

// if (keystore.contains(key.hashCode()) {
cache.remove(key);
cache.remove(new EhcacheKey(key));
// keystore.remove(key.hashCode());
// }
}
Expand Down Expand Up @@ -185,6 +201,15 @@ public boolean isPersistent() {
return isPersistent;
}

public K convertEhcacheKeyToOriginal(EhcacheKey eKey) throws IOException {
BytesStreamInput is = new BytesStreamInput();
byte[] bytes = eKey.getBytes();
is.readBytes(bytes, 0, bytes.length);
// we somehow have to use the Reader thing in the Writeable interface
// otherwise its not generic
}

@Override
public void close() {
// Call this method after each test, otherwise the directory will stay locked and you won't be able to
// initialize another IndicesRequestCache (for example in the next test that runs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,34 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.EhcacheDiskCachingTier;

// moved to another file for testing flexibility purposes

public class EhcacheEventListener<K, V> implements CacheEventListener<K, V> { // make it private after debugging
public class EhcacheEventListener<K extends Writeable, V> implements CacheEventListener<EhcacheKey, V> {
// Receives key-value pairs (BytesReference, BytesReference), but must transform into (Key, BytesReference)
// to send removal notifications
private RemovalListener<K, V> removalListener;
private CounterMetric counter;
EhcacheEventListener(RemovalListener<K, V> removalListener, CounterMetric counter) {
private EhcacheDiskCachingTier tier;
EhcacheEventListener(RemovalListener<K, V> removalListener, EhcacheDiskCachingTier tier) {
this.removalListener = removalListener;
this.counter = counter; // needed to handle count changes
this.tier = tier; // needed to handle count changes
}
@Override
public void onEvent(CacheEvent<? extends K, ? extends V> event) {
K key = event.getKey();
public void onEvent(CacheEvent<? extends EhcacheKey, ? extends V> event) {
EhcacheKey ehcacheKey = event.getKey();
V oldValue = event.getOldValue();
V newValue = event.getNewValue();
EventType eventType = event.getType();

System.out.println("I am eventing!!");

// handle changing count for the disk tier
if (oldValue == null && newValue != null) {
counter.inc();
tier.countInc();
} else if (oldValue != null && newValue == null) {
counter.dec();
} else {
int j; // breakpoint
tier.countDec();
}

// handle creating a RemovalReason, unless eventType is CREATED
Expand All @@ -63,6 +64,10 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
default:
reason = null;
}
removalListener.onRemoval(new RemovalNotification<K, V>(key, oldValue, reason));
try {
K key = tier.convertEhcacheKeyToOriginal(ehcacheKey);
removalListener.onRemoval(new RemovalNotification<K, V>(key, oldValue, reason));
} catch (Exception ignored) {}

}
}
54 changes: 54 additions & 0 deletions server/src/main/java/org/opensearch/indices/EhcacheKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.BytesStreamInput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;

public class EhcacheKey implements Serializable {
// the IndicesRequestCache.Key is not Serializable, but it is Writeable.
// We use the output stream's bytes in this wrapper class and implement the appropriate interfaces/methods.
// Unfortunately it's not possible to define this class as EhcacheKey<K> and use that as ehcache keys,
// because of type erasure. However, the only context EhcacheKey objects would be compared to one another
// is when they are used for the same cache, so they will always refer to the same K.
private byte[] bytes;

public EhcacheKey(Writeable key) throws IOException {
BytesStreamOutput os = new BytesStreamOutput(); // Should we pass in an expected size? If so, how big?
key.writeTo(os);
this.bytes = BytesReference.toBytes(os.bytes());
}

public byte[] getBytes() {
return this.bytes;
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof EhcacheKey)) {
return false;
}
EhcacheKey other = (EhcacheKey) o;
return Arrays.equals(this.bytes, other.bytes);
}

@Override
public int hashCode() {
return Arrays.hashCode(this.bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.core.common.io.stream.Writeable;

import java.util.Arrays;
import java.util.List;
Expand All @@ -22,10 +23,10 @@
* @param <K>
* @param <V>
*/
public class TieredCacheSpilloverStrategyHandler<K, V> implements TieredCacheHandler<K, V>, RemovalListener<K, V> {
public class TieredCacheSpilloverStrategyHandler<K extends Writeable, V> implements TieredCacheHandler<K, V>, RemovalListener<K, V> {

private final OnHeapCachingTier<K, V> onHeapCachingTier;
private final EhcacheDiskCachingTier<K, V> diskCachingTier; // changed for testing
private final DiskCachingTier<K, V> diskCachingTier; // changed for testing
private final TieredCacheEventListener<K, V> tieredCacheEventListener;

/**
Expand Down Expand Up @@ -109,7 +110,7 @@ public CachingTier<K, V> getOnHeapCachingTier() {
return this.onHeapCachingTier;
}

public EhcacheDiskCachingTier<K, V> getDiskCachingTier() { // change to CachingTier after debug
public DiskCachingTier<K, V> getDiskCachingTier() { // change to CachingTier after debug
return this.diskCachingTier;
}

Expand Down Expand Up @@ -147,7 +148,7 @@ public static class CacheValue<V> {
}
}

public static class Builder<K, V> {
public static class Builder<K extends Writeable, V> {
private OnHeapCachingTier<K, V> onHeapCachingTier;
private CachingTier<K, V> diskCachingTier;
private TieredCacheEventListener<K, V> tieredCacheEventListener;
Expand Down

0 comments on commit 990ba9c

Please sign in to comment.