From 70a6935020d56b4a3b295228c18d3fe161264948 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 23 Aug 2024 10:42:05 -0700 Subject: [PATCH] [Bugfix] Fixes IRC NPE bug for timed-out cacheable queries (#15327) * Fix IRC timeout bug Signed-off-by: Peter Alfonsi * addressed Sagar's comments Signed-off-by: Peter Alfonsi * addressed Ankit's comments Signed-off-by: Peter Alfonsi * Add UT for test coverage Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi * tweak imports in new UT Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi * rerun gradle Signed-off-by: Peter Alfonsi --------- Signed-off-by: Peter Alfonsi Co-authored-by: Peter Alfonsi --- .../indices/IndicesRequestCacheIT.java | 62 +++++++++++++++++++ .../indices/IndicesRequestCache.java | 11 ++-- .../opensearch/indices/IndicesService.java | 4 +- .../indices/IndicesServiceTests.java | 35 +++++++++++ 4 files changed, 104 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 500a2e93fb6bb..fad1798f52ef5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,6 +34,12 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.Weight; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; @@ -58,7 +64,10 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.search.aggregations.bucket.histogram.Histogram; @@ -66,6 +75,7 @@ import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.ZoneId; @@ -769,6 +779,58 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { stats = getNodeCacheStats(client(node_2)); assertTrue(stats.getMemorySizeInBytes() == 0); } + public void testTimedOutQuery() throws Exception { + // A timed out query should be cached and then invalidated + Client client = client(); + String index = "index"; + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + ) + .get() + ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); + + QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") { + @Override + protected Query doToQuery(QueryShardContext context) { + return new TermQuery(new Term("k", "hello")) { + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + // Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will + // sometimes throw an exception on timeout, rather than timing out gracefully. + Weight result = super.createWeight(searcher, scoreMode, boost); + try { + Thread.sleep(500); + } catch (InterruptedException ignored) {} + return result; + } + }; + } + }; + + SearchResponse resp = client.prepareSearch(index) + .setRequestCache(true) + .setQuery(timeoutQueryBuilder) + .setTimeout(TimeValue.ZERO) + .get(); + assertTrue(resp.isTimedOut()); + RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); + // The cache should be empty as the timed-out query was invalidated + assertEquals(0, requestCacheStats.getMemorySizeInBytes()); + } private Path shardDirectory(String server, Index index, int shard) { NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server); diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 93946fa11de13..71f8cf5a78ec5 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -310,12 +310,11 @@ BytesReference getOrCompute( * @param cacheKey the cache key to invalidate */ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { - assert reader.getReaderCacheHelper() != null; - String readerCacheKeyId = null; - if (reader instanceof OpenSearchDirectoryReader) { - IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); - readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); - } + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity(); cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard)))); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 416a272353f6c..b64791bf40581 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -68,6 +68,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -1659,8 +1660,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) { if (context.getQueryShardContext().isCacheable() == false) { return false; } - return true; - + return context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof DelegatingCacheHelper; } /** diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index d679a3cc10ba4..bc39f85c04784 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -31,12 +31,15 @@ package org.opensearch.indices; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.Version; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndexShardStats; +import org.opensearch.action.search.SearchType; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexGraveyard; @@ -44,6 +47,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -77,9 +81,12 @@ import org.opensearch.plugins.EnginePlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.test.VersionUtils; +import org.opensearch.test.TestSearchContext; import org.opensearch.test.hamcrest.RegexMatcher; import java.io.IOException; @@ -633,4 +640,32 @@ public void testClusterRemoteTranslogBufferIntervalDefault() { IndicesService indicesService = getIndicesService(); assertEquals(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indicesService.getClusterRemoteTranslogBufferInterval()); } + + public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws IOException { + IndicesService indicesService = getIndicesService(); + final IndexService indexService = createIndex("test"); + ShardSearchRequest request = mock(ShardSearchRequest.class); + when(request.requestCache()).thenReturn(true); + + TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) { + @Override + public SearchType searchType() { + return SearchType.QUERY_THEN_FETCH; + } + }; + + ContextIndexSearcher searcher = mock(ContextIndexSearcher.class); + context.setSearcher(searcher); + DirectoryReader reader = mock(DirectoryReader.class); + when(searcher.getDirectoryReader()).thenReturn(reader); + when(searcher.getIndexReader()).thenReturn(reader); + IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class); + DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class); + + for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) { + IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper; + when(reader.getReaderCacheHelper()).thenReturn(cacheHelper); + assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context)); + } + } }