Skip to content

Commit

Permalink
move value cache to star tree context + other comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Oct 7, 2024
1 parent 326400a commit 89c845d
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ public class StarTreeQueryHelper {
* Checks if the search context can be supported by star-tree
*/
public static boolean isStarTreeSupported(SearchContext context) {
return context.aggregations() != null
&& context.mapperService().isCompositeIndexPresent()
&& context.parsedPostFilter() == null
&& context.innerHits().getInnerHits().isEmpty()
&& context.sort() == null
&& context.trackScores() == false
&& context.minimumScore() == null;
return context.aggregations() != null && context.mapperService().isCompositeIndexPresent() && context.parsedPostFilter() == null;
}

/**
Expand All @@ -77,26 +71,16 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
compositeMappedFieldType.getCompositeIndexType()
);

StarTreeQueryContext starTreeQueryContext = StarTreeQueryHelper.toStarTreeQueryContext(
starTree,
compositeMappedFieldType,
source.query()
);
if (starTreeQueryContext == null) {
return null;
}

for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
if (metricStat == null) {
return null;
}
}

if (context.aggregations().factories().getFactories().length > 1) {
context.initializeStarTreeValuesMap();
}
return starTreeQueryContext;
boolean cacheStarTreeValues = context.aggregations().factories().getFactories().length > 1;

return StarTreeQueryHelper.toStarTreeQueryContext(starTree, compositeMappedFieldType, source.query(), cacheStarTreeValues);
}

/**
Expand All @@ -105,7 +89,8 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
private static StarTreeQueryContext toStarTreeQueryContext(
CompositeIndexFieldInfo compositeIndexFieldInfo,
CompositeDataCubeFieldType compositeFieldType,
QueryBuilder queryBuilder
QueryBuilder queryBuilder,
boolean cacheStarTreeValues
) {
Map<String, Long> queryMap;
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
Expand All @@ -122,7 +107,7 @@ private static StarTreeQueryContext toStarTreeQueryContext(
} else {
return null;
}
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap);
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValues);
}

/**
Expand Down Expand Up @@ -201,14 +186,14 @@ public static LeafBucketCollector getStarTreeLeafCollector(
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues.getMetricValuesIterator(
metricName
);
// Obtain a FixedBitSet of matched document IDs
FixedBitSet matchedDocIds = getStarTreeFilteredValues(context, ctx, starTreeValues); // Assuming this method gives a FixedBitSet
assert matchedDocIds != null;
// Obtain a FixedBitSet of matched star tree document IDs
FixedBitSet filteredValues = getStarTreeFilteredValues(context, ctx, starTreeValues);
assert filteredValues != null;

int numBits = matchedDocIds.length(); // Get the length of the FixedBitSet
int numBits = filteredValues.length(); // Get the number of the filtered values (matching docs)
if (numBits > 0) {
// Iterate over the FixedBitSet
for (int bit = matchedDocIds.nextSetBit(0); bit != -1; bit = (bit + 1 < numBits) ? matchedDocIds.nextSetBit(bit + 1) : -1) {
// Iterate over the filtered values
for (int bit = filteredValues.nextSetBit(0); bit != -1; bit = (bit + 1 < numBits) ? filteredValues.nextSetBit(bit + 1) : -1) {
// Advance to the entryId in the valuesIterator
if (valuesIterator.advanceExact(bit) == false) {
continue; // Skip if no more entries
Expand Down Expand Up @@ -241,17 +226,17 @@ public void collect(int doc, long bucket) {
*/
public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafReaderContext ctx, StarTreeValues starTreeValues)
throws IOException {
if (context.getStarTreeValuesMap() != null && context.getStarTreeValuesMap().containsKey(ctx)) {
return context.getStarTreeValuesMap().get(ctx);
Map<LeafReaderContext, FixedBitSet> valueCache = context.getStarTreeQueryContext().getStarTreeValuesMap();
if (valueCache != null && valueCache.containsKey(ctx)) {
return valueCache.get(ctx);
}

StarTreeFilter filter = new StarTreeFilter(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
FixedBitSet result = filter.getStarTreeResult();

if (context.getStarTreeValuesMap() != null) {
context.getStarTreeValuesMap().put(ctx, result);
if (valueCache != null) {
valueCache.put(ctx, result);
}
return result;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@

package org.opensearch.search.internal;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -127,7 +125,6 @@ public List<InternalAggregation> toInternalAggregations(Collection<Collector> co
private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;
protected volatile Map<LeafReaderContext, FixedBitSet> starTreeValuesMap;
private volatile boolean searchTimedOut;
private StarTreeQueryContext starTreeQueryContext;

Expand Down Expand Up @@ -543,12 +540,4 @@ public SearchContext starTreeQueryContext(StarTreeQueryContext starTreeQueryCont
public StarTreeQueryContext getStarTreeQueryContext() {
return this.starTreeQueryContext;
}

public void initializeStarTreeValuesMap() {
this.starTreeValuesMap = new HashMap<>();
}

public Map<LeafReaderContext, FixedBitSet> getStarTreeValuesMap() {
return starTreeValuesMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.search.startree;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Query class for querying star tree data structure.
Expand All @@ -31,23 +34,21 @@ public class StarTreeQueryContext {
* Map of field name to a value to be queried for that field
* This is used to filter the data based on the query
*/
private final Map<String, Long> queryMap;
private volatile Map<String, Long> queryMap;

// /**
// * Cache for leaf results
// * This is used to cache the results for each leaf reader context
// * to avoid reading the data from the leaf reader context multiple times
// */
// private volatile Map<LeafReaderContext, Map<String, StarTreeQueryHelper.MetricInfo>> leafResultsCache;

// /**
// * List of metrics to be computed & cached
// */
// private List<StarTreeQueryHelper.MetricInfo> metrics;
/**
* Cache for leaf results
* This is used to cache the results for each leaf reader context
* to avoid reading the filtered values from the leaf reader context multiple times
*/
protected volatile Map<LeafReaderContext, FixedBitSet> starTreeValuesMap;

public StarTreeQueryContext(CompositeIndexFieldInfo starTree, Map<String, Long> queryMap) {
public StarTreeQueryContext(CompositeIndexFieldInfo starTree, Map<String, Long> queryMap, boolean cacheStarTreeValues) {
this.starTree = starTree;
this.queryMap = queryMap;
if (cacheStarTreeValues) {
starTreeValuesMap = new ConcurrentHashMap<>();
}
}

public CompositeIndexFieldInfo getStarTree() {
Expand All @@ -57,4 +58,8 @@ public CompositeIndexFieldInfo getStarTree() {
public Map<String, Long> getQueryMap() {
return queryMap;
}

public Map<LeafReaderContext, FixedBitSet> getStarTreeValuesMap() {
return starTreeValuesMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.action.OriginalIndices;
import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IndexService;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite99.datacube.startree.StarTreeDocValuesFormatTests;
import org.opensearch.index.compositeindex.CompositeIndexSettings;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ReaderContext;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.startree.StarTreeQueryContext;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import java.io.IOException;
import java.util.Map;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;

public class SearchServiceStarTreeTests extends OpenSearchSingleNodeTestCase {

public void testParseQueryToOriginalOrStarTreeQuery() throws IOException {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(FeatureFlags.STAR_TREE_INDEX, true).build());
setStarTreeIndexSetting("true");

Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey(), true)
.build();
CreateIndexRequestBuilder builder = client().admin()
.indices()
.prepareCreate("test")
.setSettings(settings)
.setMapping(StarTreeDocValuesFormatTests.getExpandedMapping());
createIndex("test", builder);

IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test"));
IndexShard indexShard = indexService.getShard(0);
ShardSearchRequest request = new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(true),
indexShard.shardId(),
1,
new AliasFilter(null, Strings.EMPTY_ARRAY),
1.0f,
-1,
null,
null
);

// Case 1: No query or aggregations, should not use star tree
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
assertStarTreeContext(request, sourceBuilder, null, false);

// Case 2: MatchAllQuery present but no aggregations, should not use star tree
sourceBuilder = new SearchSourceBuilder().query(new MatchAllQueryBuilder());
assertStarTreeContext(request, sourceBuilder, null, false);

// Case 3: MatchAllQuery and aggregations present, should use star tree
sourceBuilder = new SearchSourceBuilder().size(0)
.query(new MatchAllQueryBuilder())
.aggregation(AggregationBuilders.max("test").field("field"));
CompositeIndexFieldInfo expectedStarTree = new CompositeIndexFieldInfo(
"startree",
CompositeMappedFieldType.CompositeFieldType.STAR_TREE
);
Map<String, Long> expectedQueryMap = null;
assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, false), false);

// Case 4: MatchAllQuery and aggregations present, but postFilter specified, should not use star tree
sourceBuilder = new SearchSourceBuilder().size(0)
.query(new MatchAllQueryBuilder())
.aggregation(AggregationBuilders.max("test").field("field"))
.postFilter(new MatchAllQueryBuilder());
assertStarTreeContext(request, sourceBuilder, null, false);

// Case 5: TermQuery and single aggregation, should use star tree, but not initialize query cache
sourceBuilder = new SearchSourceBuilder().size(0)
.query(new TermQueryBuilder("sndv", 1))
.aggregation(AggregationBuilders.max("test").field("field"));
expectedQueryMap = Map.of("sndv", 1L);
assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, false), false);

// Case 6: TermQuery and multiple aggregations present, should use star tree & initialize cache
sourceBuilder = new SearchSourceBuilder().size(0)
.query(new TermQueryBuilder("sndv", 1))
.aggregation(AggregationBuilders.max("test").field("field"))
.aggregation(AggregationBuilders.sum("test2").field("field"));
expectedQueryMap = Map.of("sndv", 1L);
assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, expectedQueryMap, true), true);

// Case 7: No query, metric aggregations present, should use star tree
sourceBuilder = new SearchSourceBuilder().size(0).aggregation(AggregationBuilders.max("test").field("field"));
assertStarTreeContext(request, sourceBuilder, new StarTreeQueryContext(expectedStarTree, null, false), false);

setStarTreeIndexSetting(null);
}

private void setStarTreeIndexSetting(String value) throws IOException {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING.getKey(), value).build())
.execute();
}

private void assertStarTreeContext(
ShardSearchRequest request,
SearchSourceBuilder sourceBuilder,
StarTreeQueryContext expectedContext,
boolean expectedCacheUsage
) throws IOException {
request.source(sourceBuilder);
SearchService searchService = getInstanceFromNode(SearchService.class);
try (ReaderContext reader = searchService.createOrGetReaderContext(request, false)) {
SearchContext context = searchService.createContext(reader, request, null, true);
StarTreeQueryContext actualContext = context.getStarTreeQueryContext();

if (expectedContext == null) {
assertThat(context.getStarTreeQueryContext(), nullValue());
} else {
assertThat(actualContext, notNullValue());
assertEquals(expectedContext.getStarTree().getType(), actualContext.getStarTree().getType());
assertEquals(expectedContext.getStarTree().getField(), actualContext.getStarTree().getField());
assertEquals(expectedContext.getQueryMap(), actualContext.getQueryMap());
assertThat(context.getStarTreeQueryContext().getStarTreeValuesMap(), expectedCacheUsage ? notNullValue() : nullValue());
}
searchService.doStop();
}
}
}
Loading

0 comments on commit 89c845d

Please sign in to comment.