Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats rework (1/3): Interfaces and implementations for tiers #24

Open
wants to merge 34 commits into
base: ehcache-disk-integ-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
641abf1
CacheStats interface and SingleDimensionCacheStats impl
Feb 8, 2024
937ca51
First attempt to integrate new stats with EhcacheDiskCache
Feb 8, 2024
21b1078
Adds Serializer interface and impls for Key and BytesReference
Jan 9, 2024
a460acd
Adds ICacheKey serializer impl
Feb 9, 2024
31ee9ae
Added more icachekey serializer tests
Feb 9, 2024
9069077
Attempts to fix ehcache hits
Feb 9, 2024
18ddb23
Fixed unexpected misses by implementing hashCode for ICacheKey and Ca…
Feb 9, 2024
8618458
Adds memory tracking to disk tier
Feb 12, 2024
46b3e49
Updated other tests
Feb 12, 2024
51ea583
Added partial memory tests
Feb 12, 2024
17cf26a
Changed cache stats API to get values by dimension list
Feb 12, 2024
be5eece
Cleanup
Feb 13, 2024
a6b0899
Redid memory tracking to use already implemented weigher fn
Feb 13, 2024
9b1e66c
Split CacheStats into CacheStats and CacheStatsBase, which can't upda…
Feb 13, 2024
01e04a8
Readded BytesReferenceSerializer impl as ICacheKeySerializerTests dep…
Feb 13, 2024
3777e3f
Changed SingleDimensionCacheStats to use ConcurrentMap
Feb 14, 2024
6059680
Made SingleDimensionCacheStats also take in tier dimensions
Feb 14, 2024
7f5a455
Added overall CacheStatsResponse object packaging all 5 metrics
Feb 14, 2024
36c600d
Removed skeleton TSC stats implementation
Feb 14, 2024
27fdfe1
Merge remote-tracking branch 'sgup432/ehcache_disk_integ' into rework…
Feb 26, 2024
c8dc1b3
Modified factories to take new arguments
Feb 26, 2024
4abd602
added utilitly fns to CacheStatsResponse
Feb 27, 2024
2586fa1
Making TieredCachePlugin constructor public
sgup432 Feb 27, 2024
6a2b374
Fixing CacheService unit test
sgup432 Feb 27, 2024
3e7ea26
Added multi dimension cache stats impl
Feb 27, 2024
b4c83e7
Removed SingleDimensionCacheStats
Feb 27, 2024
d579c51
Adds IRC key serializer
Feb 27, 2024
c34c218
Addressed Sagar's other comment
Feb 27, 2024
2f4bd4f
Merge remote-tracking branch 'sgup432/ehcache_disk_integ' into rework…
Feb 28, 2024
a61f033
Fixed on heap stats integration, added tests
Feb 29, 2024
2483981
Optimized multi dimension stats
Mar 1, 2024
2aeaa53
Added reset() to CacheStats
Mar 1, 2024
60df761
Fixed reset impl for multi dim stats
Mar 1, 2024
ea33af8
spotlessApply
Mar 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -44,7 +46,14 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;

// TODO: Listeners for removals from the two tiers
// private final RemovalListener<ICacheKey<K>, V> onDiskRemovalListener;
// private final RemovalListener<ICacheKey<K>, V> onHeapRemovalListener;

// The listener for removals from the spillover cache as a whole
private final RemovalListener<ICacheKey<K>, V> removalListener;
private final CacheStats stats;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
Expand All @@ -58,27 +67,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
this.onHeapCache = builder.onHeapCacheFactory.create(new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<>() {
@Override
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
this.stats = null; // TODO
}

// Package private for testing
Expand All @@ -92,20 +101,19 @@ ICache<K, V> getDiskCache() {
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {

public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
Expand All @@ -121,7 +129,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
Expand All @@ -147,7 +155,7 @@ public void invalidateAll() {
*/
@SuppressWarnings("unchecked")
@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
}

Expand Down Expand Up @@ -176,7 +184,11 @@ public void close() throws IOException {
}
}

private Function<K, V> getValueFromTieredCache() {
public CacheStats stats() {
return stats;
}

private Function<ICacheKey<K>, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
Expand Down Expand Up @@ -254,7 +266,7 @@ public String getCacheName() {
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private RemovalListener<ICacheKey<K>, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
Expand Down Expand Up @@ -289,7 +301,7 @@ public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
public Builder<K, V> setRemovalListener(RemovalListener<ICacheKey<K>, V> removalListener) {
this.removalListener = removalListener;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin {
/**
* Default constructor
*/
TieredSpilloverCachePlugin() {}
public TieredSpilloverCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.stats.CacheStats;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;
Expand All @@ -23,22 +24,14 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;

public class TieredSpilloverCacheTests extends OpenSearchTestCase {

public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception {
// TODO: These tests are uncommented in the second stats rework PR, which adds a TSC stats implementation
/*public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
int keyValueSize = 50;

Expand Down Expand Up @@ -803,7 +796,7 @@ public String load(String key) {
});
thread.start();
assertBusy(() -> { assertTrue(loadAwareCacheLoader.isLoaded()); }, 100, TimeUnit.MILLISECONDS); // We wait for new key to be loaded
// after which it eviction flow is
// after which it eviction flow is
// guaranteed to occur.
ICache<String, String> onDiskCache = tieredSpilloverCache.getDiskCache();

Expand Down Expand Up @@ -879,7 +872,7 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
.setDiskCacheFactory(mockDiskCacheFactory)
.setCacheConfig(cacheConfig)
.build();
}
}*/
}

/**
Expand All @@ -896,7 +889,7 @@ public OpenSearchOnHeapCacheWrapper(Builder<K, V> builder) {
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = super.get(key);
if (value != null) {
statsHolder.hitCount.inc();
Expand All @@ -907,13 +900,13 @@ public V get(K key) {
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
super.put(key, value);
statsHolder.onCachedMetric.inc();
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
V value = super.computeIfAbsent(key, loader);
if (loader.isLoaded()) {
statsHolder.missCount.inc();
Expand All @@ -925,7 +918,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
super.invalidate(key);
}

Expand All @@ -935,7 +928,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return super.keys();
}

Expand All @@ -953,7 +946,7 @@ public void refresh() {
public void close() {}

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
super.onRemoval(notification);
}

Expand Down Expand Up @@ -989,24 +982,25 @@ class StatsHolder {

class MockOnDiskCache<K, V> implements ICache<K, V> {

Map<K, V> cache;
Map<ICacheKey<K>, V> cache;
int maxSize;
long delay;
CacheStats stats = null; // TODO

MockOnDiskCache(int maxSize, long delay) {
this.maxSize = maxSize;
this.delay = delay;
this.cache = new ConcurrentHashMap<K, V>();
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
}

@Override
public V get(K key) {
public V get(ICacheKey<K> key) {
V value = cache.get(key);
return value;
}

@Override
public void put(K key, V value) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
// eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED,
// CacheStoreType.DISK));
Expand All @@ -1022,7 +1016,7 @@ public void put(K key, V value) {
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
V value = cache.computeIfAbsent(key, key1 -> {
try {
return loader.load(key);
Expand All @@ -1040,7 +1034,7 @@ public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Except
}

@Override
public void invalidate(K key) {
public void invalidate(ICacheKey<K> key) {
if (this.cache.containsKey(key)) {
// eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.DISK));
}
Expand All @@ -1053,7 +1047,7 @@ public void invalidateAll() {
}

@Override
public Iterable<K> keys() {
public Iterable<ICacheKey<K>> keys() {
return this.cache.keySet();
}

Expand All @@ -1065,6 +1059,11 @@ public long count() {
@Override
public void refresh() {}

@Override
public CacheStats stats() {
return stats;
}

@Override
public void close() {

Expand Down
Loading
Loading