diff --git a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java new file mode 100644 index 0000000000000..55ffe22c2a339 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; + +import java.io.IOException; +import java.util.Arrays; + +public class BytesReferenceSerializer implements Serializer { + // This class does not get passed to ehcache itself, so it's not required that classes match after deserialization. + + public BytesReferenceSerializer() {} + @Override + public byte[] serialize(BytesReference object) { + return BytesReference.toBytes(object); + } + + @Override + public BytesReference deserialize(byte[] bytes) { + return new BytesArray(bytes); + } + + @Override + public boolean equals(BytesReference object, byte[] bytes) { + return Arrays.equals(serialize(object), bytes); + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java index e7e52c7fbccbf..ff42ac510231b 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTier.java @@ -8,6 +8,8 @@ package org.opensearch.common.cache.tier; +import org.ehcache.core.spi.service.FileBasedPersistenceContext; +import org.ehcache.spi.serialization.SerializerException; import org.opensearch.OpenSearchException; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; @@ -18,6 +20,7 @@ import org.opensearch.common.unit.TimeValue; import java.io.File; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Iterator; import java.util.NoSuchElementException; @@ -40,13 +43,17 @@ import org.ehcache.expiry.ExpiryPolicy; import org.ehcache.impl.config.store.disk.OffHeapDiskStoreConfiguration; +/** + * @param The key type of cache entries + * @param The value type of cache entries + */ public class EhCacheDiskCachingTier implements DiskCachingTier { // A Cache manager can create many caches. private final PersistentCacheManager cacheManager; // Disk cache - private Cache cache; + private Cache cache; private final long maxWeightInBytes; private final String storagePath; @@ -86,11 +93,16 @@ public class EhCacheDiskCachingTier implements DiskCachingTier { // will hold that many file pointers. public final Setting DISK_SEGMENTS; + private final Serializer keySerializer; + private final Serializer valueSerializer; + private EhCacheDiskCachingTier(Builder builder) { this.keyType = Objects.requireNonNull(builder.keyType, "Key type shouldn't be null"); this.valueType = Objects.requireNonNull(builder.valueType, "Value type shouldn't be null"); this.expireAfterAccess = Objects.requireNonNull(builder.expireAfterAcess, "ExpireAfterAccess value shouldn't " + "be null"); - this.ehCacheEventListener = new EhCacheEventListener(); + this.keySerializer = Objects.requireNonNull(builder.keySerializer, "Key serializer shouldn't be null"); + this.valueSerializer = Objects.requireNonNull(builder.valueSerializer, "Value serializer shouldn't be null"); + this.ehCacheEventListener = new EhCacheEventListener(this.valueSerializer); this.maxWeightInBytes = builder.maxWeightInBytes; this.storagePath = Objects.requireNonNull(builder.storagePath, "Storage path shouldn't be null"); if (builder.threadPoolAlias == null || builder.threadPoolAlias.isBlank()) { @@ -124,37 +136,38 @@ private PersistentCacheManager buildCacheManager() { .build(true); } - private Cache buildCache(Duration expireAfterAccess, Builder builder) { + private Cache buildCache(Duration expireAfterAccess, Builder builder) { return this.cacheManager.createCache( DISK_CACHE_ALIAS, CacheConfigurationBuilder.newCacheConfigurationBuilder( - this.keyType, - this.valueType, + keyType, + byte[].class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B) - ).withExpiry(new ExpiryPolicy() { + ).withExpiry(new ExpiryPolicy() { @Override - public Duration getExpiryForCreation(K key, V value) { + public Duration getExpiryForCreation(K key, byte[] value) { return INFINITE; } @Override - public Duration getExpiryForAccess(K key, Supplier value) { + public Duration getExpiryForAccess(K key, Supplier value) { return expireAfterAccess; } @Override - public Duration getExpiryForUpdate(K key, Supplier oldValue, V newValue) { + public Duration getExpiryForUpdate(K key, Supplier oldValue, byte[] newValue) { return INFINITE; } }) .withService(getListenerConfiguration(builder)) .withService( - new OffHeapDiskStoreConfiguration( - this.threadPoolAlias, - DISK_WRITE_CONCURRENCY.get(settings), - DISK_SEGMENTS.get(settings) - ) + new OffHeapDiskStoreConfiguration( + this.threadPoolAlias, + DISK_WRITE_CONCURRENCY.get(settings), + DISK_SEGMENTS.get(settings) ) + ) + .withKeySerializer(new KeySerializerWrapper(keySerializer)) ); } @@ -177,12 +190,12 @@ private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder< @Override public V get(K key) { // Optimize it by adding key store. - return cache.get(key); + return valueSerializer.deserialize(cache.get(key)); } @Override public void put(K key, V value) { - cache.put(key, value); + cache.put(key, valueSerializer.serialize(value)); } @Override @@ -193,7 +206,7 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception @Override public void invalidate(K key) { - // There seems to be an thread leak issue while calling this and then closing cache. + // There seems to be a thread leak issue while calling this and then closing cache. cache.remove(key); } @@ -244,18 +257,23 @@ public void close() { * @param Type of key * @param Type of value */ - class EhCacheEventListener implements CacheEventListener { + class EhCacheEventListener implements CacheEventListener { private Optional> removalListener; + // We need to pass the value serializer to this listener, as the removal listener is expecting + // values of type K, V, not K, byte[] + private Serializer valueSerializer; - EhCacheEventListener() {} + EhCacheEventListener(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + } public void setRemovalListener(RemovalListener removalListener) { this.removalListener = Optional.ofNullable(removalListener); } @Override - public void onEvent(CacheEvent event) { + public void onEvent(CacheEvent event) { switch (event.getType()) { case CREATED: count.inc(); @@ -264,7 +282,12 @@ public void onEvent(CacheEvent event) { case EVICTED: this.removalListener.ifPresent( listener -> listener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.EVICTED) + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.EVICTED, + TierType.DISK + ) ) ); count.dec(); @@ -277,7 +300,12 @@ public void onEvent(CacheEvent event) { case EXPIRED: this.removalListener.ifPresent( listener -> listener.onRemoval( - new RemovalNotification<>(event.getKey(), event.getOldValue(), RemovalReason.INVALIDATED) + new RemovalNotification<>( + event.getKey(), + valueSerializer.deserialize(event.getOldValue()), + RemovalReason.INVALIDATED, + TierType.DISK + ) ) ); count.dec(); @@ -297,9 +325,9 @@ public void onEvent(CacheEvent event) { */ class EhCacheKeyIterator implements Iterator { - Iterator> iterator; + Iterator> iterator; - EhCacheKeyIterator(Iterator> iterator) { + EhCacheKeyIterator(Iterator> iterator) { this.iterator = iterator; } @@ -317,6 +345,40 @@ public K next() { } } + /** + * The wrapper for the key serializer which is passed directly to Ehcache. + */ + private class KeySerializerWrapper implements org.ehcache.spi.serialization.Serializer { + public Serializer serializer; + public KeySerializerWrapper(Serializer serializer) { + this.serializer = serializer; + } + + // This constructor must be present, but does not have to work as we are not actually persisting the disk + // cache after a restart. + // See https://www.ehcache.org/documentation/3.0/serializers-copiers.html#persistent-vs-transient-caches + public KeySerializerWrapper(ClassLoader classLoader, FileBasedPersistenceContext persistenceContext) {} + + @Override + public ByteBuffer serialize(K object) throws SerializerException { + return ByteBuffer.wrap(serializer.serialize(object)); + } + + @Override + public K read(ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.deserialize(arr); + } + + @Override + public boolean equals(K object, ByteBuffer binary) throws ClassNotFoundException, SerializerException { + byte[] arr = new byte[binary.remaining()]; + binary.get(arr); + return serializer.equals(object, arr); + } + } + /** * Builder object to build Ehcache disk tier. * @param Type of key @@ -342,6 +404,8 @@ public static class Builder { // Provides capability to make ehCache event listener to run in sync mode. Used for testing too. private boolean isEventListenerModeSync; + private Serializer keySerializer; + private Serializer valueSerializer; public Builder() {} @@ -399,6 +463,16 @@ public EhCacheDiskCachingTier.Builder setIsEventListenerModeSync(boolean i return this; } + public EhCacheDiskCachingTier.Builder setKeySerializer(Serializer keySerializer) { + this.keySerializer = keySerializer; + return this; + } + + public EhCacheDiskCachingTier.Builder setValueSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + return this; + } + public EhCacheDiskCachingTier build() { return new EhCacheDiskCachingTier<>(this); } diff --git a/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java b/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java new file mode 100644 index 0000000000000..74c256d188682 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/Serializer.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import java.io.IOException; + +/** + * An interface for serializers, to be used in disk caching tier and elsewhere. + * T is the class of the original object, and U is the serialized class. + */ +public interface Serializer { + /** + * Serializes an object. + * @param object A non-serialized object. + * @return The serialized representation of the object. + */ + U serialize(T object); + + /** + * Deserializes bytes into an object. + * @param bytes The serialized representation. + * @return The original object. + */ + T deserialize(U bytes); + + /** + * Compares an object to a serialized representation of an object. + * @param object A non-serialized objet + * @param bytes Serialized representation of an object + * @return true if representing the same object, false if not + */ + boolean equals(T object, U bytes); +} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java new file mode 100644 index 0000000000000..2fc9c7cbb2756 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.Randomness; +import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; +import org.opensearch.core.common.util.ByteArray; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Random; + +public class BytesReferenceSerializerTests extends OpenSearchTestCase { + public void testEquality() throws Exception { + BytesReferenceSerializer ser = new BytesReferenceSerializer(); + // Test that values are equal before and after serialization, for each implementation of BytesReference. + byte[] bytesValue = new byte[1000]; + Random rand = Randomness.get(); + rand.nextBytes(bytesValue); + + BytesReference ba = new BytesArray(bytesValue); + byte[] serialized = ser.serialize(ba); + assertTrue(ser.equals(ba, serialized)); + BytesReference deserialized = ser.deserialize(serialized); + assertEquals(ba, deserialized); + + BytesReference cbr = CompositeBytesReference.of(new BytesArray(bytesValue), new BytesArray(bytesValue)); + serialized = ser.serialize(cbr); + assertTrue(ser.equals(cbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(cbr, deserialized); + + // We need the PagedBytesReference to be larger than the page size (16 KB) in order to actually create it + byte[] pbrValue = new byte[PageCacheRecycler.PAGE_SIZE_IN_BYTES * 2]; + rand.nextBytes(pbrValue); + ByteArray arr = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(pbrValue.length); + arr.set(0L, pbrValue, 0, pbrValue.length); + assert !arr.hasArray(); + BytesReference pbr = BytesReference.fromByteArray(arr, pbrValue.length); + serialized = ser.serialize(pbr); + assertTrue(ser.equals(pbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(pbr, deserialized); + + BytesReference rbr = new ReleasableBytesReference(new BytesArray(bytesValue), ReleasableBytesReference.NO_OP); + serialized = ser.serialize(rbr); + assertTrue(ser.equals(rbr, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(rbr, deserialized); + } +} diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index 804f236264daa..2a9f1aca6ecb5 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -8,16 +8,22 @@ package org.opensearch.common.cache.tier; +import org.opensearch.common.Randomness; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.env.NodeEnvironment; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -40,6 +46,8 @@ public void testBasicGetAndPut() throws IOException { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(10, 100); Map keyValueMap = new HashMap<>(); @@ -57,6 +65,41 @@ public void testBasicGetAndPut() throws IOException { } } + public void testBasicGetAndPutBytesReference() throws Exception { + Settings settings = Settings.builder().build(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + EhCacheDiskCachingTier ehCacheDiskCachingTier = new EhCacheDiskCachingTier.Builder() + .setKeyType(String.class) + .setValueType(BytesReference.class) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setSettings(settings) + .setThreadPoolAlias("ehcacheTest") + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 2) + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new BytesReferenceSerializer()) + .build(); + int randomKeys = randomIntBetween(10, 100); + int valueLength = 1000; + Random rand = Randomness.get(); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + byte[] valueBytes = new byte[valueLength]; + rand.nextBytes(valueBytes); + keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes)); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + BytesReference value = ehCacheDiskCachingTier.get(entry.getKey()); + assertEquals(entry.getValue(), value); + } + ehCacheDiskCachingTier.close(); + } + } + public void testConcurrentPut() throws Exception { Settings settings = Settings.builder().build(); try (NodeEnvironment env = newNodeEnvironment(settings)) { @@ -69,6 +112,8 @@ public void testConcurrentPut() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(20, 100); Thread[] threads = new Thread[randomKeys]; @@ -111,6 +156,8 @@ public void testEhcacheParallelGets() throws Exception { .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") .setSettingPrefix(SETTING_PREFIX) .setIsEventListenerModeSync(true) // For accurate count + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); ehCacheDiskCachingTierNew.setRemovalListener(removalListener(new AtomicInteger())); int randomKeys = randomIntBetween(20, 100); @@ -153,6 +200,8 @@ public void testEhcacheKeyIterator() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setSettingPrefix(SETTING_PREFIX) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); int randomKeys = randomIntBetween(2, 2); @@ -187,6 +236,8 @@ public void testCompute() throws Exception { .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) .setSettingPrefix(SETTING_PREFIX) .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) .build(); // For now it is unsupported. assertThrows( @@ -223,4 +274,23 @@ public boolean isLoaded() { private RemovalListener removalListener(AtomicInteger counter) { return notification -> counter.incrementAndGet(); } + + private static class StringSerializer implements Serializer { + + private final Charset charset = StandardCharsets.UTF_8; + @Override + public byte[] serialize(String object) { + return object.getBytes(charset); + } + + @Override + public String deserialize(byte[] bytes) { + return new String(bytes, charset); + } + + @Override + public boolean equals(String object, byte[] bytes) { + return object.equals(deserialize(bytes)); + } + } }