From df8b26eac975363e8d3a6209d3534e0efaa9f765 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 09:42:13 -0700 Subject: [PATCH 1/2] [Tiered Caching] Enabling serialization for IndicesRequestCache key object Signed-off-by: Sagar Upadhyaya --- .../indices/IndicesRequestCacheIT.java | 40 +++++++++++ .../index/OpenSearchDirectoryReader.java | 52 +++++++++++++- .../indices/IndicesRequestCache.java | 72 +++++++++++++------ .../opensearch/indices/IndicesService.java | 21 ++++-- .../indices/IndicesRequestCacheTests.java | 72 ++++++++++++++----- .../indices/IndicesServiceCloseTests.java | 7 ++ 6 files changed, 222 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 98a22717019cf..a1815d9be2daf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index 77609822d3d90..e5038436012dd 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -39,6 +39,7 @@ import org.opensearch.core.index.shard.ShardId; import java.io.IOException; +import java.util.UUID; /** * A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes @@ -51,11 +52,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader { private final ShardId shardId; private final FilterDirectoryReader.SubReaderWrapper wrapper; + private DelegatingCacheHelper delegatingCacheHelper; + private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId) throws IOException { super(in, wrapper); this.wrapper = wrapper; this.shardId = shardId; + this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper()); } /** @@ -68,7 +72,53 @@ public ShardId shardId() { @Override public CacheHelper getReaderCacheHelper() { // safe to delegate since this reader does not alter the index - return in.getReaderCacheHelper(); + return this.delegatingCacheHelper; + } + + public DelegatingCacheHelper getDelegatingCacheHelper() { + return this.delegatingCacheHelper; + } + + public class DelegatingCacheHelper implements CacheHelper { + CacheHelper cacheHelper; + DelegatingCacheKey serializableCacheKey; + + DelegatingCacheHelper(CacheHelper cacheHelper) { + this.cacheHelper = cacheHelper; + this.serializableCacheKey = new DelegatingCacheKey(cacheHelper.getKey()); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + public class DelegatingCacheKey { + CacheKey cacheKey; + private final UUID uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public UUID getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..25a067bdc8d0c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -51,6 +51,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; import java.io.Closeable; @@ -108,8 +111,9 @@ public final class IndicesRequestCache implements RemovalListener cache; + private final IndicesService indicesService; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -121,6 +125,7 @@ public final class IndicesRequestCache implements RemovalListener { + protected static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader; @@ -207,7 +225,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable { + interface CacheEntity extends Accountable, Writeable { /** * Called after the value was loaded. @@ -240,6 +258,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,17 +266,23 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); + class Key implements Accountable, Writeable { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + public final String readerCacheKeyUniqueId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { + Key(CacheEntity entity, BytesReference value, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); this.value = value; + this.readerCacheKeyUniqueId = Objects.requireNonNull(readerCacheKeyUniqueId); + } + + Key(StreamInput in) throws IOException { + this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.readerCacheKeyUniqueId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override @@ -276,7 +301,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, key.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; if (!value.equals(key.value)) return false; return true; @@ -285,19 +310,26 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + result = 31 * result + readerCacheKeyUniqueId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(entity); + out.writeOptionalString(readerCacheKeyUniqueId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyUniqueId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyUniqueId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyUniqueId = readerCacheKeyUniqueId; } @Override @@ -315,7 +347,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (Objects.equals(readerCacheKeyUniqueId, that.readerCacheKeyUniqueId) == false) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,7 +355,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyUniqueId); return result; } } @@ -336,7 +368,7 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyUniqueId == null || cleanupKey.entity.isOpen() == false) { // null indicates full cleanup, as does a closed shard currentFullClean.add(cleanupKey.entity.getCacheIdentity()); } else { @@ -349,7 +381,7 @@ synchronized void cleanCache() { if (currentFullClean.contains(key.entity.getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyUniqueId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a72142e65c5e8..f5e71327b6e7b 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -391,7 +391,7 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, this); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,14 +1746,21 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); + public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } + public IndexShardCacheEntity(StreamInput in) throws IOException { + Index index = in.readOptionalWriteable(Index::new); + int shardId = in.readVInt(); + IndexService indexService = indices.get(index.getUUID()); + this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); + } + @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1775,6 +1782,12 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(indexShard.shardId().getIndex()); + out.writeVInt(indexShard.shardId().id()); + } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..664865f21f3a8 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,28 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexService; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -122,7 +127,7 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -218,7 +223,7 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); @@ -244,7 +249,8 @@ public void testEviction() throws Exception { IOUtils.close(reader, secondReader, writer, dir, cache); } IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + null ); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -281,7 +287,7 @@ public void testEviction() throws Exception { } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); AtomicBoolean indexShard = new AtomicBoolean(true); ShardRequestCache requestCacheStats = new ShardRequestCache(); @@ -366,7 +372,7 @@ public BytesReference get() { public void testInvalidate() throws Exception { ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -435,20 +441,23 @@ public void testInvalidate() throws Exception { public void testEqualsKey() throws IOException { AtomicBoolean trueBoolean = new AtomicBoolean(true); AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,6 +468,32 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + ShardRequestCache shardRequestCache = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyUniqueId); + assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(termBytes, key2.value); + + } + private class TestBytesReference extends AbstractBytesReference { int dummyValue; @@ -538,5 +573,8 @@ public Object getCacheIdentity() { public long ramBytesUsed() { return 42; } + + @Override + public void writeTo(StreamOutput out) throws IOException {} } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 415844dccb611..364c7a94cad54 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; @@ -59,6 +60,7 @@ import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.transport.nio.MockNioTransportPlugin; +import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -315,6 +317,11 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(0L, cache.count()); IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + @Override + public void writeTo(StreamOutput out) throws IOException { + + } + @Override public long ramBytesUsed() { return 42; From 93c1172af25e0c37d78a251c3eb36c5c0bbdd14f Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 29 Sep 2023 10:28:53 -0700 Subject: [PATCH 2/2] Fixing javadoc issue Signed-off-by: Sagar Upadhyaya --- .../common/lucene/index/OpenSearchDirectoryReader.java | 8 ++++++++ .../java/org/opensearch/indices/IndicesRequestCache.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java index e5038436012dd..b2e21d2076cbb 100644 --- a/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java +++ b/server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java @@ -79,6 +79,10 @@ public DelegatingCacheHelper getDelegatingCacheHelper() { return this.delegatingCacheHelper; } + /** + * Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey. + * @opensearch.internal + */ public class DelegatingCacheHelper implements CacheHelper { CacheHelper cacheHelper; DelegatingCacheKey serializableCacheKey; @@ -103,6 +107,10 @@ public void addClosedListener(ClosedListener listener) { } } + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ public class DelegatingCacheKey { CacheKey cacheKey; private final UUID uniqueId; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 25a067bdc8d0c..e1d18b1172865 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -198,7 +198,7 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference * * @opensearch.internal */ - protected static class Loader implements CacheLoader { + private static class Loader implements CacheLoader { private final CacheEntity entity; private final CheckedSupplier loader;