Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Serializer interface for use in ehcache disk tier and elsewhere;… #9

Open
wants to merge 3 commits into
base: sagars-ehcache-final
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;

import java.io.IOException;
import java.util.Arrays;

public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.

public BytesReferenceSerializer() {}
@Override
public byte[] serialize(BytesReference object) {
return BytesReference.toBytes(object);
}

@Override
public BytesReference deserialize(byte[] bytes) {
return new BytesArray(bytes);
}

@Override
public boolean equals(BytesReference object, byte[] bytes) {
return Arrays.equals(serialize(object), bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.common.cache.tier;

import org.ehcache.core.spi.service.FileBasedPersistenceContext;
import org.ehcache.spi.serialization.SerializerException;
import org.opensearch.OpenSearchException;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
Expand All @@ -18,6 +20,7 @@
import org.opensearch.common.unit.TimeValue;

import java.io.File;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -40,13 +43,17 @@
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration;

/**
* @param <K> The key type of cache entries
* @param <V> The value type of cache entries
*/
public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {

// A Cache manager can create many caches.
private final PersistentCacheManager cacheManager;

// Disk cache
private Cache<K, V> cache;
private Cache<K, byte[]> cache;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks confusing. Where we take key as generic K but value as byte[]. Ideally we need to serialize both eventually. Plus it can happen that both key and value maybe of same type like ByteReference, String etc, so having this hybrid doesn't make sense.

Better to keep both byte[] or use generic for value as well and define a valueWrapper like keyWrapper.

Let me know it it make sense.

private final long maxWeightInBytes;
private final String storagePath;

Expand Down Expand Up @@ -86,11 +93,16 @@ public class EhCacheDiskCachingTier<K, V> implements DiskCachingTier<K, V> {
// will hold that many file pointers.
public final Setting<Integer> DISK_SEGMENTS;

private final Serializer<K, byte[]> keySerializer;
private final Serializer<V, byte[]> valueSerializer;

private EhCacheDiskCachingTier(Builder<K, V> builder) {
this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null");
this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null");
this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null");
this.ehCacheEventListener = new EhCacheEventListener<K, V>();
this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null");
this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null");
this.ehCacheEventListener = new EhCacheEventListener<K, V>(this.valueSerializer);
this.maxWeightInBytes = builder.maxWeightInBytes;
this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null");
if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) {
Expand Down Expand Up @@ -124,37 +136,38 @@ private PersistentCacheManager buildCacheManager() {
.build(true);
}

private Cache<K, V> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
private Cache<K, byte[]> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
return this.cacheManager.createCache(
DISK_CACHE_ALIAS,
CacheConfigurationBuilder.newCacheConfigurationBuilder(
this.keyType,
this.valueType,
keyType,
byte[].class,
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
).withExpiry(new ExpiryPolicy<K, V>() {
).withExpiry(new ExpiryPolicy<K, byte[]>() {
@Override
public Duration getExpiryForCreation(K key, V value) {
public Duration getExpiryForCreation(K key, byte[] value) {
return INFINITE;
}

@Override
public Duration getExpiryForAccess(K key, Supplier<? extends V> value) {
public Duration getExpiryForAccess(K key, Supplier<? extends byte[]> value) {
return expireAfterAccess;
}

@Override
public Duration getExpiryForUpdate(K key, Supplier<? extends V> oldValue, V newValue) {
public Duration getExpiryForUpdate(K key, Supplier<? extends byte[]> oldValue, byte[] newValue) {
return INFINITE;
}
})
.withService(getListenerConfiguration(builder))
.withService(
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
DISK_WRITE_CONCURRENCY.get(settings),
DISK_SEGMENTS.get(settings)
)
new OffHeapDiskStoreConfiguration(
this.threadPoolAlias,
DISK_WRITE_CONCURRENCY.get(settings),
DISK_SEGMENTS.get(settings)
)
)
.withKeySerializer(new KeySerializerWrapper<K>(keySerializer))
);
}

Expand All @@ -177,12 +190,12 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<
@Override
public V get(K key) {
// Optimize it by adding key store.
return cache.get(key);
return valueSerializer.deserialize(cache.get(key));
}

@Override
public void put(K key, V value) {
cache.put(key, value);
cache.put(key, valueSerializer.serialize(value));
}

@Override
Expand All @@ -193,7 +206,7 @@ public V computeIfAbsent(K key, TieredCacheLoader<K, V> loader) throws Exception

@Override
public void invalidate(K key) {
// There seems to be an thread leak issue while calling this and then closing cache.
// There seems to be a thread leak issue while calling this and then closing cache.
cache.remove(key);
}

Expand Down Expand Up @@ -244,18 +257,23 @@ public void close() {
* @param <K> Type of key
* @param <V> Type of value
*/
class EhCacheEventListener<K, V> implements CacheEventListener<K, V> {
class EhCacheEventListener<K, V> implements CacheEventListener<K, byte[]> {

private Optional<RemovalListener<K, V>> removalListener;
// We need to pass the value serializer to this listener, as the removal listener is expecting
// values of type K, V, not K, byte[]
private Serializer<V, byte[]> valueSerializer;

EhCacheEventListener() {}
EhCacheEventListener(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
}

public void setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = Optional.ofNullable(removalListener);
}

@Override
public void onEvent(CacheEvent<? extends K, ? extends V> event) {
public void onEvent(CacheEvent<? extends K, ? extends byte[]> event) {
switch (event.getType()) {
case CREATED:
count.inc();
Expand All @@ -264,7 +282,12 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
case EVICTED:
this.removalListener.ifPresent(
listener -> listener.onRemoval(
new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED)
new RemovalNotification<>(
event.getKey(),
valueSerializer.deserialize(event.getOldValue()),
RemovalReason.EVICTED,
TierType.DISK
)
)
);
count.dec();
Expand All @@ -277,7 +300,12 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
case EXPIRED:
this.removalListener.ifPresent(
listener -> listener.onRemoval(
new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED)
new RemovalNotification<>(
event.getKey(),
valueSerializer.deserialize(event.getOldValue()),
RemovalReason.INVALIDATED,
TierType.DISK
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed it earlier, can you also add TierType.DISK as an argument to this constructor.

)
);
count.dec();
Expand All @@ -297,9 +325,9 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
*/
class EhCacheKeyIterator<K> implements Iterator<K> {

Iterator<Cache.Entry<K, V>> iterator;
Iterator<Cache.Entry<K, byte[]>> iterator;

EhCacheKeyIterator(Iterator<Cache.Entry<K, V>> iterator) {
EhCacheKeyIterator(Iterator<Cache.Entry<K, byte[]>> iterator) {
this.iterator = iterator;
}

Expand All @@ -317,6 +345,40 @@ public K next() {
}
}

/**
* The wrapper for the key serializer which is passed directly to Ehcache.
*/
private class KeySerializerWrapper<K> implements org.ehcache.spi.serialization.Serializer<K> {
public Serializer<K, byte[]> serializer;
public KeySerializerWrapper(Serializer<K, byte[]> serializer) {
this.serializer = serializer;
}

// This constructor must be present, but does not have to work as we are not actually persisting the disk
// cache after a restart.
// See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches
public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {}

@Override
public ByteBuffer serialize(K object) throws SerializerException {
return ByteBuffer.wrap(serializer.serialize(object));
}

@Override
public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.deserialize(arr);
}

@Override
public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException {
byte[] arr = new byte[binary.remaining()];
binary.get(arr);
return serializer.equals(object, arr);
}
}

/**
* Builder object to build Ehcache disk tier.
* @param <K> Type of key
Expand All @@ -342,6 +404,8 @@ public static class Builder<K, V> {

// Provides capability to make ehCache event listener to run in sync mode. Used for testing too.
private boolean isEventListenerModeSync;
private Serializer<K, byte[]> keySerializer;
private Serializer<V, byte[]> valueSerializer;

public Builder() {}

Expand Down Expand Up @@ -399,6 +463,16 @@ public EhCacheDiskCachingTier.Builder<K, V> setIsEventListenerModeSync(boolean i
return this;
}

public EhCacheDiskCachingTier.Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
this.keySerializer = keySerializer;
return this;
}

public EhCacheDiskCachingTier.Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
this.valueSerializer = valueSerializer;
return this;
}

public EhCacheDiskCachingTier<K, V> build() {
return new EhCacheDiskCachingTier<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.cache.tier;

import java.io.IOException;

/**
* An interface for serializers, to be used in disk caching tier and elsewhere.
* T is the class of the original object, and U is the serialized class.
*/
public interface Serializer<T, U> {
/**
* Serializes an object.
* @param object A non-serialized object.
* @return The serialized representation of the object.
*/
U serialize(T object);

/**
* Deserializes bytes into an object.
* @param bytes The serialized representation.
* @return The original object.
*/
T deserialize(U bytes);

/**
* Compares an object to a serialized representation of an object.
* @param object A non-serialized objet
* @param bytes Serialized representation of an object
* @return true if representing the same object, false if not
*/
boolean equals(T object, U bytes);
}
Loading
Loading