Skip to content

Commit

Permalink
Adds Serializer interface for use in ehcache disk tier and elsewhere;…
Browse files Browse the repository at this point in the history
… modifies existing disk tier impl to use it in a generic way

Signed-off-by: Peter Alfonsi <[email protected]>

Fixed byte[] key implementation to use ByteBuffer wrapper passed directly to ehcache

Signed-off-by: Peter Alfonsi <[email protected]>

Added tests for BytesReference serializer, and ehcache disk tier using BytesReference as a value
  • Loading branch information
Peter Alfonsi committed Nov 2, 2023
1 parent 27cc265 commit e39514b
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
import java.util.Arrays;

public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.

public BytesReferenceSerializer() {}
@Override
public byte[] serialize(BytesReference object) {
return BytesReference.toBytes(object);
}

@Override
public BytesReference deserialize(byte[] bytes) {
return new BytesArray(bytes);
}

@Override
public boolean equals(BytesReference object, byte[] bytes) {
return Arrays.equals(serialize(object), bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.common.cache.tier;

import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.spi.serialization.SerializerException;
import org.opensearch.OpenSearchException;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -18,6 +20,7 @@
import org.opensearch.common.unit.TimeValue;

import java.io.File;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -40,13 +43,25 @@
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;

/**
* This ehcache disk caching tier uses its value serializer outside ehcache.
* Values are transformed to byte[] outside ehcache and then ehcache uses its bundled byte[] serializer.
* The key serializer you pass to this class produces a byte[]. This serializer is passed to a wrapper which
* implements Ehcache's serializer implementation and produces a BytesBuffer. The wrapper instance is then passed to ehcache.
* This is done because to get keys on a disk tier, ehcache internally checks the equals() method of the serializer,
* but ALSO requires newKey.equals(storedKey) (this isn't documented), which is the case for ByteBuffer but not byte[].
* This limitation means that the key serializer must preserve the class of the key before/after serialization,
* but the value serializer does not have to do this.
* @param <K> The key type of cache entries
* @param <V> The value type of cache entries
*/
public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {

// A Cache manager can create many caches.
private final PersistentCacheManager cacheManager;

// Disk cache
private Cache<K, V> cache;
private Cache<K, byte[]> cache;
private final long maxWeightInBytes;
private final String storagePath;

Expand Down Expand Up @@ -86,11 +101,16 @@ public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {
// will hold that many file pointers.
public final Setting<Integer> DISK_SEGMENTS;

private final Serializer<K, byte[]> keySerializer;
private final Serializer<V, byte[]> valueSerializer;

private EhCacheDiskCachingTier(Builder<K, V> builder) {
this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null");
this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null");
this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null");
this.ehCacheEventListener = new EhCacheEventListener<K, V>();
this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null");
this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null");
this.ehCacheEventListener = new EhCacheEventListener<K, V>(this.valueSerializer);
this.maxWeightInBytes = builder.maxWeightInBytes;
this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null");
if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) {
Expand Down Expand Up @@ -124,37 +144,38 @@ private PersistentCacheManager buildCacheManager() {
.build(true);
}

private Cache<K, V> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
private Cache<K, byte[]> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
return this.cacheManager.createCache(
DISK_CACHE_ALIAS,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
this.keyType,
this.valueType,
keyType,
byte[].class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<K, V>() {
).withExpiry(new ExpiryPolicy<K, byte[]>() {
@Override
public Duration getExpiryForCreation(K key, V value) {
public Duration getExpiryForCreation(K key, byte[] value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(K key, Supplier<? extends V> value) {
public Duration getExpiryForAccess(K key, Supplier<? extends byte[]> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(K key, Supplier<? extends V> oldValue, V newValue) {
public Duration getExpiryForUpdate(K key, Supplier<? extends byte[]> oldValue, byte[] newValue) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
DISK_WRITE_CONCURRENCY.get(settings),
DISK_SEGMENTS.get(settings)
)
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
DISK_WRITE_CONCURRENCY.get(settings),
DISK_SEGMENTS.get(settings)
)
)
.withKeySerializer(new KeySerializerWrapper<K>(keySerializer))
);
}

Expand All @@ -177,12 +198,12 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<
@Override
public V get(K key) {
// Optimize it by adding key store.
return cache.get(key);
return valueSerializer.deserialize(cache.get(key));
}

@Override
public void put(K key, V value) {
cache.put(key, value);
cache.put(key, valueSerializer.serialize(value));
}

@Override
Expand All @@ -193,7 +214,7 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception

@Override
public void invalidate(K key) {
// There seems to be an thread leak issue while calling this and then closing cache.
// There seems to be a thread leak issue while calling this and then closing cache.
cache.remove(key);
}

Expand Down Expand Up @@ -244,18 +265,23 @@ public void close() {
* @param <K> Type of key
* @param <V> Type of value
*/
class EhCacheEventListener<K, V> implements CacheEventListener<K, V> {
class EhCacheEventListener<K, V> implements CacheEventListener<K, byte[]> {

private Optional<RemovalListener<K, V>> removalListener;
// We need to pass the value serializer to this listener, as the removal listener is expecting
// values of type K, V, not K, byte[]
private Serializer<V, byte[]> valueSerializer;

EhCacheEventListener() {}
EhCacheEventListener(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
}

public void setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = Optional.ofNullable(removalListener);
}

@Override
public void onEvent(CacheEvent<? extends K, ? extends V> event) {
public void onEvent(CacheEvent<? extends K, ? extends byte[]> event) {
switch (event.getType()) {
case CREATED:
count.inc();
Expand All @@ -264,7 +290,11 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
case EVICTED:
this.removalListener.ifPresent(
listener -> listener.onRemoval(
new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED)
new RemovalNotification<>(
event.getKey(),
valueSerializer.deserialize(event.getOldValue()),
RemovalReason.EVICTED
)
)
);
count.dec();
Expand All @@ -277,7 +307,11 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
case EXPIRED:
this.removalListener.ifPresent(
listener -> listener.onRemoval(
new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED)
new RemovalNotification<>(
event.getKey(),
valueSerializer.deserialize(event.getOldValue()),
RemovalReason.INVALIDATED
)
)
);
count.dec();
Expand All @@ -297,9 +331,9 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
*/
class EhCacheKeyIterator<K> implements Iterator<K> {

Iterator<Cache.Entry<K, V>> iterator;
Iterator<Cache.Entry<K, byte[]>> iterator;

EhCacheKeyIterator(Iterator<Cache.Entry<K, V>> iterator) {
EhCacheKeyIterator(Iterator<Cache.Entry<K, byte[]>> iterator) {
this.iterator = iterator;
}

Expand All @@ -317,6 +351,40 @@ public K next() {
}
}

/**
* The wrapper for the key serializer which is passed directly to Ehcache.
*/
private class KeySerializerWrapper<K> implements org.ehcache.spi.serialization.Serializer<K> {
public Serializer<K, byte[]> serializer;
public KeySerializerWrapper(Serializer<K, byte[]> serializer) {
this.serializer = serializer;
}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
public ByteBuffer serialize(K object) throws SerializerException {
return ByteBuffer.wrap(serializer.serialize(object));
}

@Override
public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.deserialize(arr);
}

@Override
public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.equals(object, arr);
}
}

/**
* Builder object to build Ehcache disk tier.
* @param <K> Type of key
Expand All @@ -342,6 +410,8 @@ public static class Builder<K, V> {

// Provides capability to make ehCache event listener to run in sync mode. Used for testing too.
private boolean isEventListenerModeSync;
private Serializer<K, byte[]> keySerializer;
private Serializer<V, byte[]> valueSerializer;

public Builder() {}

Expand Down Expand Up @@ -399,6 +469,16 @@ public EhCacheDiskCachingTier.Builder<K, V> setIsEventListenerModeSync(boolean i
return this;
}

public EhCacheDiskCachingTier.Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public EhCacheDiskCachingTier.Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

public EhCacheDiskCachingTier<K, V> build() {
return new EhCacheDiskCachingTier<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import java.io.IOException;

/**
* An interface for serializers, to be used in disk caching tier and elsewhere.
* T is the class of the original object, and U is the serialized class.
*/
public interface Serializer<T, U> {
/**
* Serializes an object.
* @param object A non-serialized object.
* @return The serialized representation of the object.
*/
U serialize(T object);

/**
* Deserializes bytes into an object.
* @param bytes The serialized representation.
* @return The original object.
*/
T deserialize(U bytes);

/**
* Compares an object to a serialized representation of an object.
* @param object A non-serialized objet
* @param bytes Serialized representation of an object
* @return true if representing the same object, false if not
*/
boolean equals(T object, U bytes);
}
Loading

0 comments on commit e39514b

Please sign in to comment.