Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Star Tree Request/Response structure #227

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@
import org.opensearch.index.IndexSortConfig;
import org.opensearch.index.analysis.IndexAnalyzers;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.fielddata.IndexFieldData;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.mapper.ContentPath;
import org.opensearch.index.mapper.DerivedFieldResolver;
import org.opensearch.index.mapper.DerivedFieldResolverFactory;
Expand All @@ -73,13 +77,17 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptFactory;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.metrics.SumAggregatorFactory;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.lookup.SearchLookup;
import org.opensearch.search.query.startree.StarTreeQuery;
import org.opensearch.search.startree.OriginalOrStarTreeQuery;
import org.opensearch.search.startree.StarTreeQuery;
import org.opensearch.transport.RemoteClusterAware;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -90,6 +98,7 @@
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -529,6 +538,66 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction<QueryBuil
}
}

public ParsedQuery toStarTreeQuery(CompositeIndexFieldInfo starTree, QueryBuilder queryBuilder, Query query) {
Map<String, List<Predicate<Long>>> predicateMap = getStarTreePredicates(queryBuilder);
StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap, null);
OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query);
return new ParsedQuery(originalOrStarTreeQuery);
}

/**
* Parse query body to star-tree predicates
* @param queryBuilder
* @return
*/
private Map<String, List<Predicate<Long>>> getStarTreePredicates(QueryBuilder queryBuilder) {
// Assuming the following variables have been initialized:
Map<String, List<Predicate<Long>>> predicateMap = new HashMap<>();

// Check if the query builder is an instance of TermQueryBuilder
if (queryBuilder instanceof TermQueryBuilder) {
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
String field = tq.fieldName();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be converted to dimension field name of star tree?

long inputQueryVal = Long.parseLong(tq.value().toString());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert to sortable long


// Get or create the list of predicates for the given field
List<Predicate<Long>> predicates = predicateMap.getOrDefault(field, new ArrayList<>());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use predicates, we can't use binary search during star tree traversal.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't really put much thought into this yet, let me revisit the part on how to better store information to use filters. For request/response parsing I had just inspired changes from previous POCs.


// Create a predicate to match the input query value
Predicate<Long> predicate = dimVal -> dimVal == inputQueryVal;
predicates.add(predicate);

// Put the predicates list back into the map
predicateMap.put(field, predicates);
} else {
throw new IllegalArgumentException("The query is not a term query");
}
return predicateMap;

}

public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) {
String field = null;
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
.stream()
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));

// Existing support only for MetricAggregators without sub-aggregations
if (aggregatorFactory.getSubFactories().getFactories().length != 0) {
return false;
}

// TODO: increment supported aggregation type
if (aggregatorFactory instanceof SumAggregatorFactory) {
field = ((SumAggregatorFactory) aggregatorFactory).getField();
if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) {
return true;
}
}

return false;
}

public Index index() {
return indexSettings.getIndex();
}
Expand Down
53 changes: 30 additions & 23 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,16 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
import org.opensearch.index.mapper.DerivedFieldResolver;
import org.opensearch.index.mapper.DerivedFieldResolverFactory;
import org.opensearch.index.mapper.StarTreeMapper;
import org.opensearch.index.query.InnerHitContextBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.MatchNoneQueryBuilder;
import org.opensearch.index.query.ParsedQuery;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryRewriteContext;
import org.opensearch.index.query.QueryShardContext;
Expand All @@ -97,13 +101,13 @@
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.AggregationInitializationException;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.aggregations.AggregatorFactory;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalAggregation.ReduceContext;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.aggregations.startree.StarTreeAggregator;
import org.opensearch.search.aggregations.startree.StarTreeAggregatorFactory;
import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.dfs.DfsPhase;
Expand Down Expand Up @@ -1349,9 +1353,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
if (source.sorts() != null) {
try {
Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
if (optionalSort.isPresent()) {
context.sort(optionalSort.get());
}
optionalSort.ifPresent(context::sort);
} catch (IOException e) {
throw new SearchException(shardTarget, "failed to create sort elements", e);
}
Expand Down Expand Up @@ -1509,35 +1511,40 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc

if (canUseStarTree) {
try {
if (setStarTreeQuery(context, queryShardContext, source)) {
logger.info("Star Tree will be used in execution");
};
setStarTreeQuery(context, queryShardContext, source);
logger.info("using star tree");
} catch (IOException e) {
logger.info("Cannot use star-tree");
logger.info("not using star tree");
}

}
}

private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) throws IOException {

// TODO: (finish)
// 1. Check criteria for star-tree query / aggregation formation
// 2: Set StarTree Query & Star Tree Aggregator here

private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source)
throws IOException {
if (source.aggregations() == null) {
return false;
}

context.parsedQuery(queryShardContext.toStarTreeQuery(null, Set.of("sum_status")));

StarTreeAggregatorFactory factory = new StarTreeAggregatorFactory("sum_status", queryShardContext, null, null, null, List.of("status"), List.of("sum"));
StarTreeAggregatorFactory[] factories = {factory};
AggregatorFactories aggregatorFactories = new AggregatorFactories(factories);
// TODO: Support for multiple startrees
CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService()
.getCompositeFieldTypes()
.iterator()
.next();
CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo(
compositeMappedFieldType.name(),
compositeMappedFieldType.getCompositeIndexType()
);

context.aggregations(new SearchContextAggregations(aggregatorFactories, multiBucketConsumerService.create()));
ParsedQuery parsedQuery = queryShardContext.toStarTreeQuery(starTree, source.query(), context.query());
AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0];
if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory
&& aggregatorFactory.getSubFactories().getFactories().length == 0)) {
return false;
}

// StarTreeAggregatorFactory factory = new StarTreeAggregatorFactory()
if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory)) {
context.parsedQuery(parsedQuery);
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() {
return new PipelineTree(subTrees, aggregators);
}
}

public AggregatorFactory[] getFactories() {
return factories;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() {
public boolean evaluateChildFactories() {
return factories.allFactoriesSupportConcurrentSearch();
}

public AggregatorFactories getSubFactories() {
return factories;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@

package org.opensearch.search.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentReader;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.Comparators;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.CompositeIndexReader;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.sort.SortOrder;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
* Base class to aggregate all docs into a single numeric metric value.
Expand Down Expand Up @@ -107,4 +114,14 @@ public BucketComparator bucketComparator(String key, SortOrder order) {
return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC);
}
}

protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
SegmentReader reader = Lucene.segmentReader(ctx.reader());
if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to see if its better to load them as doubleValuesSource similar to how existing fields are loaded. And that too load the specific fields requested instead of loading the entire star tree values. ( for example in sum aggregator, we can fetch the doubleFieldData of a particular field of star tree metric , for eg : sum_status_metric can be loaded in )

Then you don't need to worry about conversion either.

CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader();
StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree);
final AtomicReference<StarTreeValues> aggrVal = new AtomicReference<>(null);

return values;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
package org.opensearch.search.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.DoubleArray;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper;
import org.opensearch.index.fielddata.SortedNumericDoubleValues;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
Expand All @@ -45,6 +49,8 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.OriginalOrStarTreeQuery;
import org.opensearch.search.startree.StarTreeQuery;

import java.io.IOException;
import java.util.Map;
Expand All @@ -56,13 +62,13 @@
*/
public class SumAggregator extends NumericMetricsAggregator.SingleValue {

private final ValuesSource.Numeric valuesSource;
private final DocValueFormat format;
protected final ValuesSource.Numeric valuesSource;
protected final DocValueFormat format;

private DoubleArray sums;
private DoubleArray compensations;
protected DoubleArray sums;
protected DoubleArray compensations;

SumAggregator(
public SumAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
SearchContext context,
Expand All @@ -86,6 +92,14 @@ public ScoreMode scoreMode() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) {
StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery();
return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree());
}
return getDefaultLeafCollector(ctx, sub);
}

private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
Expand Down Expand Up @@ -118,6 +132,45 @@ public void collect(int doc, long bucket) throws IOException {
};
}

private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree)
throws IOException {
final BigArrays bigArrays = context.bigArrays();
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);

StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree);

String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName();
String metricName = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues(starTree.getField(), fieldName, "sum");


return new LeafBucketCollectorBase(sub, starTreeValues) {
@Override
public void collect(int doc, long bucket) throws IOException {
// TODO: Fix the response for collecting star tree sum
sums = bigArrays.grow(sums, bucket + 1);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract out the default implementation getDefaultLeafCollector and reuse the same logic.

I really like the approach of reusing the existing aggregators.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll try and do that, I am inclined to on refactoring & re-using same implementations wherever possible.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);

we can check if we are able to get sortedNumericDoubleValues , otherwise we need to convert to double for each doc

compensations = bigArrays.grow(compensations, bucket + 1);
compensations.set(bucket, kahanSummation.delta());

SortedNumericDocValues dv = (SortedNumericDocValues) starTreeValues.getMetricDocValuesIteratorMap().get(metricName);

if (dv.advanceExact(doc)) {
final int valuesCount = dv.docValueCount();
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

for (int i = 0; i < valuesCount; i++) {
double value = Double.longBitsToDouble(dv.nextValue());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we do this ?

 public static Double sortableLongtoDouble(Long value) {
        return NumericUtils.sortableLongToDouble(value);
    }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also lets see if we can get double sorted numeric dv ahead

@Override
        public SortedNumericDoubleValues getDoubleValues() {
            try {
                SortedNumericDocValues raw = DocValues.getSortedNumeric(reader, field);
                return FieldData.sortableLongBitsToDoubles(raw);
            } catch (IOException e) {
                throw new IllegalStateException("Cannot load doc values", e);
            }
        }

kahanSummation.add(value);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*
* @opensearch.internal
*/
class SumAggregatorFactory extends ValuesSourceAggregatorFactory {
public class SumAggregatorFactory extends ValuesSourceAggregatorFactory {

SumAggregatorFactory(
String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) {
public SortedNumericDoubleValues doubleValues(LeafReaderContext context) {
return indexFieldData.load(context).getDoubleValues();
}

public String getIndexFieldName() {
return indexFieldData.getFieldName();
}
}

/**
Expand Down
Loading
Loading