From dcb9fad0cc76162d61045d4ef87638f33d8a7698 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 11 Oct 2023 14:17:42 -0700 Subject: [PATCH] thread pool stuff --- .../offheap-disk-store/ehcache-disk-store.meta | 2 +- .../indices/EhcacheDiskCachingTier.java | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/server/disk_cache_tier/file/diskTier_8a0816fa8975d40fe56fbdb0d44b485392d49431/offheap-disk-store/ehcache-disk-store.meta b/server/disk_cache_tier/file/diskTier_8a0816fa8975d40fe56fbdb0d44b485392d49431/offheap-disk-store/ehcache-disk-store.meta index 2d3e90d195f10..3165bdaf28d86 100644 --- a/server/disk_cache_tier/file/diskTier_8a0816fa8975d40fe56fbdb0d44b485392d49431/offheap-disk-store/ehcache-disk-store.meta +++ b/server/disk_cache_tier/file/diskTier_8a0816fa8975d40fe56fbdb0d44b485392d49431/offheap-disk-store/ehcache-disk-store.meta @@ -1,4 +1,4 @@ #Key and value types -#Tue Oct 10 19:13:45 EDT 2023 +#Wed Oct 11 23:48:40 GMT+03:00 2023 valueType=org.opensearch.core.common.bytes.BytesReference keyType=org.opensearch.indices.IndicesRequestCache$Key diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 3f9057c436be0..b28c159ffda17 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -11,6 +11,7 @@ import org.ehcache.config.builders.CacheConfigurationBuilder; import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; import org.ehcache.config.builders.ResourcePoolsBuilder; import org.ehcache.config.units.MemoryUnit; import org.ehcache.core.internal.statistics.DefaultStatisticsService; @@ -20,6 +21,7 @@ import org.ehcache.event.CacheEvent; import org.ehcache.event.CacheEventListener; import org.ehcache.event.EventType; +import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; import org.opensearch.common.ExponentiallyWeightedMovingAverage; import org.opensearch.common.cache.RemovalListener; import org.ehcache.Cache; @@ -28,7 +30,6 @@ import org.opensearch.common.cache.RemovalReason; import java.util.Collections; -import java.util.Map; public class EhcacheDiskCachingTier implements CachingTier, RemovalListener { @@ -39,10 +40,11 @@ public class EhcacheDiskCachingTier implements CachingTier, RemovalL private final Class valueType; private final String DISK_CACHE_FP = "disk_cache_tier"; // this should probably be defined somewhere else since we need to change security.policy based on its value private RemovalListener removalListener; - private final StatisticsService statsService; private final CacheStatistics cacheStats; private ExponentiallyWeightedMovingAverage getTimeMillisEWMA; private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in the code + private static final int MIN_WRITE_THREADS = 0; + private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes // private RBMIntKeyLookupStore keystore; // private CacheTierPolicy[] policies; // private IndicesRequestCacheDiskTierPolicy policy; @@ -52,7 +54,7 @@ public EhcacheDiskCachingTier(long maxWeightInBytes, long maxKeystoreWeightInByt this.keyType = keyType; this.valueType = valueType; String cacheAlias = "diskTier"; - this.statsService = new DefaultStatisticsService(); + StatisticsService statsService = new DefaultStatisticsService(); // our EhcacheEventListener should receive events every time an entry is removed for some reason, but not when it's created CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder @@ -63,12 +65,18 @@ public EhcacheDiskCachingTier(long maxWeightInBytes, long maxKeystoreWeightInByt EventType.UPDATED) .ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous() + // test PooledExecutionService so we know what we might need to replicate + PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + .defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS) + .build(); + this.cacheManager = CacheManagerBuilder.newCacheManagerBuilder() .using(statsService) + .using(threadConfig) .with(CacheManagerBuilder.persistence(DISK_CACHE_FP)) .withCache(cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( keyType, valueType, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, true)) - .withService(listenerConfig) + .withService(listenerConfig) // stackoverflow shows .add(), but IDE says this is deprecated. idk ).build(true); this.cache = cacheManager.getCache(cacheAlias, keyType, valueType); this.cacheStats = statsService.getCacheStatistics(cacheAlias);