diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java new file mode 100644 index 0000000000000..c0a22a0b32fd2 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.codecs.lucene90; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SegmentWriteState; + +import java.io.IOException; + +/** + * This class is an abstraction of the {@link DocValuesConsumer} for the Star Tree index structure. + * It is responsible to consume various types of document values (numeric, binary, sorted, sorted numeric, + * and sorted set) for fields in the Star Tree index. + * + * @opensearch.experimental + */ +public class Composite99DocValuesConsumer extends DocValuesConsumer { + + Lucene90DocValuesConsumer lucene90DocValuesConsumer; + + public Composite99DocValuesConsumer( + SegmentWriteState state, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension + ) throws IOException { + lucene90DocValuesConsumer = new Lucene90DocValuesConsumer(state, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public void close() throws IOException { + lucene90DocValuesConsumer.close(); + } + + @Override + public void addNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addBinaryField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedSetField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedSetField(fieldInfo, docValuesProducer); + } +} diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java index 2d96d5a505694..154f4626bc472 100644 --- a/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java @@ -21,15 +21,15 @@ import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; -import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; -import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeConstants; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; + /** * This class is a custom abstraction of the {@link DocValuesProducer} for the Star Tree index structure. * It is responsible for providing access to various types of document values (numeric, binary, sorted, sorted numeric, @@ -41,7 +41,7 @@ public class StarTree99DocValuesProducer extends DocValuesProducer { Lucene90DocValuesProducer lucene90DocValuesProducer; private final List dimensions; - private final List metrics; + private final List metrics; private final FieldInfos fieldInfos; public StarTree99DocValuesProducer( @@ -55,15 +55,10 @@ public StarTree99DocValuesProducer( String compositeFieldName ) throws IOException { this.dimensions = dimensions; - this.metrics = new ArrayList<>(); - for (MetricEntry metricEntry : metricEntries) { - this.metrics.add( - MetricAggregatorInfo.toFieldName(compositeFieldName, metricEntry.getMetricName(), metricEntry.getMetricStat().getTypeName()) - ); - } + this.metrics = metricEntries; // populates the dummy list of field infos to fetch doc id set iterators for respective fields. - this.fieldInfos = new FieldInfos(getFieldInfoList()); + this.fieldInfos = new FieldInfos(getFieldInfoList(compositeFieldName)); SegmentReadState segmentReadState = new SegmentReadState(state.directory, state.segmentInfo, fieldInfos, state.context); lucene90DocValuesProducer = new Lucene90DocValuesProducer(segmentReadState, dataCodec, dataExtension, metaCodec, metaExtension); } @@ -108,35 +103,36 @@ public void close() throws IOException { this.lucene90DocValuesProducer.close(); } - private FieldInfo[] getFieldInfoList() { + private FieldInfo[] getFieldInfoList(String compositeFieldName) { FieldInfo[] fieldInfoList = new FieldInfo[this.dimensions.size() + metrics.size()]; + // field number is not really used. We depend on unique field names to get the desired iterator int fieldNumber = 0; for (FieldInfo dimension : this.dimensions) { fieldInfoList[fieldNumber] = new FieldInfo( - dimension.getName() + StarTreeConstants.DIMENSION_SUFFIX, + fullFieldNameForStarTreeDimensionsDocValues(compositeFieldName, dimension.getName()), fieldNumber, false, - dimension.omitsNorms(), - dimension.hasPayloads(), - dimension.getIndexOptions(), - dimension.getDocValuesType(), + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, -1, - dimension.attributes(), - dimension.getPointDimensionCount(), - dimension.getPointIndexDimensionCount(), - dimension.getPointNumBytes(), - dimension.getVectorDimension(), - dimension.getVectorEncoding(), - dimension.getVectorSimilarityFunction(), + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, false, - dimension.isParentField() + false ); fieldNumber++; } - for (String metric : metrics) { + for (MetricEntry metric : metrics) { fieldInfoList[fieldNumber] = new FieldInfo( - metric + StarTreeConstants.METRIC_SUFFIX, + fullFieldNameForStarTreeMetricsDocValues(compositeFieldName, metric.getMetricName(), metric.getMetricStat().getTypeName()), fieldNumber, false, false, diff --git a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java b/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java index 07037c188a14c..ecbf3b274b0bb 100644 --- a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java @@ -9,8 +9,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.NumericUtils; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -32,12 +35,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; /** * Builder for star tree. Defines the algorithm to construct star-tree @@ -160,15 +168,9 @@ public List getMetricReaders(SegmentWriteState stat for (MetricStat metricType : metric.getMetrics()) { SequentialDocValuesIterator metricReader; FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); - if (metricType != MetricStat.COUNT) { - // Need not initialize the metric reader with relevant doc id set iterator for COUNT metric type - metricReader = new SequentialDocValuesIterator( - fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) - ); - } else { - metricReader = new SequentialDocValuesIterator(); - } - + metricReader = new SequentialDocValuesIterator( + fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) + ); metricReaders.add(metricReader); } } @@ -178,10 +180,16 @@ public List getMetricReaders(SegmentWriteState stat /** * Builds the star tree from the original segment documents * - * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field + * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - public void build(Map fieldProducerMap) throws IOException { + public void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { long startTime = System.currentTimeMillis(); logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); @@ -201,17 +209,23 @@ public void build(Map fieldProducerMap) throws IOExce metricReaders ); logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); - build(starTreeDocumentIterator); + build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); } /** * Builds the star tree using sorted and aggregated star-tree Documents * - * @param starTreeDocumentIterator contains the sorted and aggregated documents + * @param starTreeDocumentIterator contains the sorted and aggregated documents + * @param fieldNumberAcrossStarTrees + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - public void build(Iterator starTreeDocumentIterator) throws IOException { + public void build( + Iterator starTreeDocumentIterator, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { int numSegmentStarTreeDocument = totalSegmentDocs; while (starTreeDocumentIterator.hasNext()) { @@ -238,12 +252,11 @@ public void build(Iterator starTreeDocumentIterator) throws IO int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); - // TODO: When StarTree Codec is ready // Create doc values indices in disk + createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees); + // serialize star-tree serializeStarTree(numSegmentStarTreeDocument); - - // Write star tree metadata for off heap implementation } private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException { @@ -263,6 +276,113 @@ private void serializeStarTree(int numSegmentStarTreeDocument) throws IOExceptio ); } + private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, AtomicInteger fieldNumberAcrossStarTrees) + throws IOException { + List dimensionWriters = new ArrayList<>(); + List metricWriters = new ArrayList<>(); + FieldInfo[] dimensionFieldInfoList = new FieldInfo[starTreeField.getDimensionsOrder().size()]; + FieldInfo[] metricFieldInfoList = new FieldInfo[metricAggregatorInfos.size()]; + + for (int i = 0; i < dimensionFieldInfoList.length; i++) { + final FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), starTreeField.getDimensionsOrder().get(i).getField()), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + dimensionFieldInfoList[i] = fi; + dimensionWriters.add(new SortedNumericDocValuesWriter(fi, Counter.newCounter())); + } + for (int i = 0; i < metricAggregatorInfos.size(); i++) { + FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), + metricAggregatorInfos.get(i).getField(), + metricAggregatorInfos.get(i).getMetricStat().getTypeName() + ), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + metricFieldInfoList[i] = fi; + metricWriters.add(new SortedNumericDocValuesWriter(fi, Counter.newCounter())); + } + + for (int docId = 0; docId < numStarTreeDocs; docId++) { + StarTreeDocument starTreeDocument = getStarTreeDocument(docId); + for (int i = 0; i < starTreeDocument.dimensions.length; i++) { + Long val = starTreeDocument.dimensions[i]; + if (val != null) { + dimensionWriters.get(i).addValue(docId, val); + } + } + + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + try { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + metricWriters.get(i).addValue(docId, (Long) starTreeDocument.metrics[i]); + break; + case DOUBLE: + metricWriters.get(i).addValue(docId, NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i])); + break; + default: + throw new IllegalStateException("Unknown metric doc value type"); + } + } catch (IllegalArgumentException e) { + logger.info("could not parse the value, exiting creation of star tree"); + } + } + } + + addStarTreeDocValueFields(docValuesConsumer, dimensionWriters, dimensionFieldInfoList, starTreeField.getDimensionsOrder().size()); + addStarTreeDocValueFields(docValuesConsumer, metricWriters, metricFieldInfoList, metricAggregatorInfos.size()); + } + + private void addStarTreeDocValueFields( + DocValuesConsumer docValuesConsumer, + List docValuesWriters, + FieldInfo[] fieldInfoList, + int fieldCount + ) throws IOException { + for (int i = 0; i < fieldCount; i++) { + final int increment = i; + DocValuesProducer docValuesProducer = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) { + return docValuesWriters.get(increment).getDocValues(); + } + }; + docValuesConsumer.addSortedNumericField(fieldInfoList[i], docValuesProducer); + } + } + /** * Adds a document to the star-tree. * diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index ceb2559a0e16c..e4f5e46950b5f 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -105,7 +105,7 @@ public class FeatureFlags { * aggregations. */ public static final String STAR_TREE_INDEX = "opensearch.experimental.feature.composite_index.star_tree.enabled"; - public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, false, Property.NodeScope); + public static final Setting STAR_TREE_INDEX_SETTING = Setting.boolSetting(STAR_TREE_INDEX, true, Property.NodeScope); private static final List> ALL_FEATURE_FLAG_SETTINGS = List.of( REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING, diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java index 0c9c5d1d91063..62f6a12c88975 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java @@ -25,8 +25,8 @@ import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.util.IOUtils; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.CompositeIndexMetadata; import org.opensearch.index.compositeindex.datacube.Dimension; @@ -34,13 +34,12 @@ import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; -import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; import org.opensearch.index.compositeindex.datacube.startree.node.OffHeapStarTree; import org.opensearch.index.compositeindex.datacube.startree.node.StarTree; import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; -import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeConstants; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper; import java.io.IOException; import java.util.ArrayList; @@ -61,8 +60,8 @@ public class Composite90DocValuesReader extends DocValuesProducer implements Com private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class); private final DocValuesProducer delegate; - private final IndexInput dataIn; - private final ChecksumIndexInput metaIn; + private IndexInput dataIn; + private ChecksumIndexInput metaIn; private final Map starTreeMap = new LinkedHashMap<>(); private final Map compositeIndexMetadataMap = new LinkedHashMap<>(); private final Map compositeDocValuesProducerMap = new LinkedHashMap<>(); @@ -195,16 +194,30 @@ public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { @Override public void checkIntegrity() throws IOException { delegate.checkIntegrity(); - CodecUtil.checksumEntireFile(metaIn); CodecUtil.checksumEntireFile(dataIn); } @Override public void close() throws IOException { delegate.close(); - starTreeMap.clear(); - compositeIndexMetadataMap.clear(); - compositeDocValuesProducerMap.clear(); + boolean success = false; + try { + IOUtils.close(metaIn, dataIn); + for (DocValuesProducer docValuesProducer : compositeDocValuesProducerMap.values()) { + IOUtils.close(docValuesProducer); + } + success = true; + } finally { + if (!success) { + + IOUtils.closeWhileHandlingException(metaIn, dataIn); + } + starTreeMap.clear(); + compositeIndexMetadataMap.clear(); + compositeDocValuesProducerMap.clear(); + metaIn = null; + dataIn = null; + } } @Override @@ -263,13 +276,15 @@ public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo comp for (String dimension : dimensions) { dimensionsDocIdSetIteratorMap.put( dimension, - starTree99DocValuesProducer.getSortedNumeric(dimension + StarTreeConstants.DIMENSION_SUFFIX) + starTree99DocValuesProducer.getSortedNumeric( + StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), dimension) + ) ); } for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) { - String metricFullName = MetricAggregatorInfo.toFieldName( - compositeIndexFieldInfo.getField(), + String metricFullName = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), metricEntry.getMetricName(), metricEntry.getMetricStat().getTypeName() ); diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java index 97dec4bd55c88..68740cad60a14 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java @@ -13,13 +13,14 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.lucene90.Composite99DocValuesConsumer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.util.IOUtils; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; @@ -27,7 +28,7 @@ import org.opensearch.index.mapper.MapperService; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,6 +49,7 @@ public class Composite90DocValuesWriter extends DocValuesConsumer { private MergeState mergeState = null; private final Set compositeMappedFieldTypes; private final Set compositeFieldSet; + private DocValuesConsumer composite99DocValuesConsumer; public IndexOutput dataOut; public IndexOutput metaOut; @@ -58,8 +60,25 @@ public class Composite90DocValuesWriter extends DocValuesConsumer { public Composite90DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException { + this.delegate = delegate; + this.state = segmentWriteState; + this.mapperService = mapperService; + this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes(); + compositeFieldSet = new HashSet<>(); + for (CompositeMappedFieldType type : compositeMappedFieldTypes) { + compositeFieldSet.addAll(type.fields()); + } + boolean success = false; try { + this.composite99DocValuesConsumer = new Composite99DocValuesConsumer( + segmentWriteState, + Composite90DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite90DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite90DocValuesFormat.META_DOC_VALUES_CODEC, + Composite90DocValuesFormat.META_DOC_VALUES_EXTENSION + ); + String dataFileName = IndexFileNames.segmentFileName( segmentWriteState.segmentInfo.name, segmentWriteState.segmentSuffix, @@ -94,15 +113,6 @@ public Composite90DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState IOUtils.closeWhileHandlingException(this); } } - - this.delegate = delegate; - this.state = segmentWriteState; - this.mapperService = mapperService; - this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes(); - compositeFieldSet = new HashSet<>(); - for (CompositeMappedFieldType type : compositeMappedFieldTypes) { - compositeFieldSet.addAll(type.fields()); - } } @Override @@ -146,14 +156,16 @@ public void close() throws IOException { if (dataOut != null) { CodecUtil.writeFooter(dataOut); // write checksum } + success = true; } finally { if (success) { - IOUtils.close(dataOut, metaOut); + IOUtils.close(dataOut, metaOut, composite99DocValuesConsumer); } else { - IOUtils.closeWhileHandlingException(dataOut, metaOut); + IOUtils.closeWhileHandlingException(dataOut, metaOut, composite99DocValuesConsumer); } metaOut = dataOut = null; + composite99DocValuesConsumer = null; } } @@ -167,11 +179,10 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, if (compositeFieldSet.isEmpty()) { for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) { if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { - // TODO : Call StarTree builder + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.build(metaOut, dataOut, fieldProducerMap, composite99DocValuesConsumer); } } - StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); - starTreesBuilder.build(metaOut, dataOut, fieldProducerMap); } } @@ -216,7 +227,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo); if (compositeIndexValues instanceof StarTreeValues) { - List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList()); + List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), new ArrayList<>()); if (!starTreeFieldMap.containsKey(fieldInfo.getField())) { starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField()); } @@ -232,7 +243,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { } } } - final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); - starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeFieldMap, starTreeSubsPerField); + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeFieldMap, starTreeSubsPerField, composite99DocValuesConsumer); } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java index a3aa2f2fdde62..9b3c633280b08 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java @@ -84,8 +84,8 @@ public String toFieldName() { } - public static String toFieldName(String starFieldName, String field, String typeName) { - return starFieldName + DELIMITER + field + DELIMITER + typeName; + public static String toFieldName(String starFieldName, String field, String metricName) { + return starFieldName + DELIMITER + field + DELIMITER + metricName; } @Override @@ -100,7 +100,7 @@ public boolean equals(Object obj) { } if (obj instanceof MetricAggregatorInfo) { MetricAggregatorInfo anotherPair = (MetricAggregatorInfo) obj; - return metricStat == anotherPair.metricStat && field.equals(anotherPair.field); + return metricStat.equals(anotherPair.metricStat) && field.equals(anotherPair.field); } return false; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index 00cce31dc0771..d28607c067a61 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -7,6 +7,7 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.index.BaseStarTreeBuilder; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.DocIdSetIterator; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** * On heap based single tree builder @@ -61,8 +63,12 @@ public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOE } @Override - public void build(List starTreeValuesSubs) throws IOException { - build(mergeStarTrees(starTreeValuesSubs)); + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } /** diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java index 988379d7400fe..4c0e8ad8938e7 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; @@ -16,6 +17,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * A star-tree builder that builds a single star-tree. @@ -28,18 +30,29 @@ public interface StarTreeBuilder extends Closeable { /** * Builds the star tree from the original segment documents * - * @param fieldProducerMap contains the docValues producer to get docValues associated with each field - * + * @param fieldProducerMap contains the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - void build(Map fieldProducerMap) throws IOException; + void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; /** * Builds the star tree using StarTree values from multiple segments * - * @param starTreeValuesSubs contains the star tree values from multiple segments + * @param starTreeValuesSubs contains the star tree values from multiple segments + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - void build(List starTreeValuesSubs) throws IOException; + void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index 6cb4ca81e07c8..e4bf365332e7a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.store.IndexOutput; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Builder to construct star-trees based on multiple star-tree fields. @@ -40,6 +42,7 @@ public class StarTreesBuilder implements Closeable { private final List starTreeFields; private final SegmentWriteState state; private final MapperService mapperService; + private AtomicInteger fieldNumberAcrossStarTrees; public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mapperService) { List starTreeFields = new ArrayList<>(); @@ -59,12 +62,18 @@ public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mappe this.starTreeFields = starTreeFields; this.state = segmentWriteState; this.mapperService = mapperService; + this.fieldNumberAcrossStarTrees = new AtomicInteger(); } /** * Builds the star-trees. */ - public void build(IndexOutput metaOut, IndexOutput dataOut, Map fieldProducerMap) throws IOException { + public void build( + IndexOutput metaOut, + IndexOutput dataOut, + Map fieldProducerMap, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { if (starTreeFields.isEmpty()) { logger.debug("no star-tree fields found, returning from star-tree builder"); return; @@ -78,7 +87,7 @@ public void build(IndexOutput metaOut, IndexOutput dataOut, Map starTreeFieldMap, - final Map> starTreeValuesSubsPerField + final Map> starTreeValuesSubsPerField, + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { for (Map.Entry> entry : starTreeValuesSubsPerField.entrySet()) { List starTreeValuesList = entry.getValue(); StarTreeField starTreeField = starTreeFieldMap.get(entry.getKey()); StarTreeBuilder builder = getSingleTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService); - builder.build(starTreeValuesList); + builder.build(starTreeValuesList, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java index 1f16bd924dac6..c34cdc0b45ea4 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java @@ -41,10 +41,7 @@ public OffHeapStarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throw } numNodes = data.readInt(); // num nodes - RandomAccessInput in = data.randomAccessSlice( - starTreeMetadata.getDataStartFilePointer(), - starTreeMetadata.getDataStartFilePointer() + starTreeMetadata.getDataLength() - ); + RandomAccessInput in = data.randomAccessSlice(data.getFilePointer(), starTreeMetadata.getDataLength()); root = new OffHeapStarTreeNode(in, 0); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java index 91ad56ef70eb3..ec30f14f044d4 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java @@ -18,16 +18,17 @@ * @opensearch.experimental */ public class OffHeapStarTreeNode implements StarTreeNode { - public static final int NUM_INT_SERIALIZABLE_FIELDS = 7; + public static final int NUM_INT_SERIALIZABLE_FIELDS = 6; public static final int NUM_LONG_SERIALIZABLE_FIELDS = 1; + public static final int NUM_BYTE_SERIALIZABLE_FIELDS = 1; public static final long SERIALIZABLE_DATA_SIZE_IN_BYTES = (Integer.BYTES * NUM_INT_SERIALIZABLE_FIELDS) + (Long.BYTES - * NUM_LONG_SERIALIZABLE_FIELDS); + * NUM_LONG_SERIALIZABLE_FIELDS) + (NUM_BYTE_SERIALIZABLE_FIELDS * Byte.BYTES); private static final int DIMENSION_ID_OFFSET = 0; private static final int DIMENSION_VALUE_OFFSET = DIMENSION_ID_OFFSET + Integer.BYTES; private static final int START_DOC_ID_OFFSET = DIMENSION_VALUE_OFFSET + Long.BYTES; private static final int END_DOC_ID_OFFSET = START_DOC_ID_OFFSET + Integer.BYTES; private static final int AGGREGATE_DOC_ID_OFFSET = END_DOC_ID_OFFSET + Integer.BYTES; - private static final int IS_STAR_NODE_OFFSET = AGGREGATE_DOC_ID_OFFSET + Integer.BYTES; + private static final int IS_STAR_NODE_OFFSET = AGGREGATE_DOC_ID_OFFSET + Byte.BYTES; private static final int FIRST_CHILD_ID_OFFSET = IS_STAR_NODE_OFFSET + Integer.BYTES; private static final int LAST_CHILD_ID_OFFSET = FIRST_CHILD_ID_OFFSET + Integer.BYTES; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeConstants.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeConstants.java deleted file mode 100644 index 2670b7b41f4df..0000000000000 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeConstants.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.compositeindex.datacube.startree.utils; - -/** - * This class contains constant values used throughout the Star Tree index implementation. - * - * @opensearch.experimental - */ -public class StarTreeConstants { - - /** - * The suffix appended to dimension field names in the Star Tree index. - */ - public static final String DIMENSION_SUFFIX = "_dim"; - - /** - * The suffix appended to metric field names in the Star Tree index. - */ - public static final String METRIC_SUFFIX = "_metric"; - -} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java new file mode 100644 index 0000000000000..8cbb7508ee1ef --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; + +/** + * This class contains helper methods used throughout the Star Tree index implementation. + * + * @opensearch.experimental + */ +public class StarTreeHelper { + + /** + * The suffix appended to dimension field names in the Star Tree index. + */ + public static final String DIMENSION_SUFFIX = "dim"; + + /** + * The suffix appended to metric field names in the Star Tree index. + */ + public static final String METRIC_SUFFIX = "metric"; + + public static String fullFieldNameForStarTreeDimensionsDocValues(String starTreeFieldName, String dimensionName) { + return starTreeFieldName + "_" + dimensionName + "_" + DIMENSION_SUFFIX; + } + + public static String fullFieldNameForStarTreeMetricsDocValues(String name, String fieldName, String metricName) { + return MetricAggregatorInfo.toFieldName(name, fieldName, metricName) + "_" + METRIC_SUFFIX; + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java index dc91c97fb224f..e1091ba35d3f8 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java @@ -100,7 +100,7 @@ private static long computeMetricEntriesSizeInBytes(List m long totalMetricEntriesSize = 0; for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { - totalMetricEntriesSize += metricAggregatorInfo.getMetric().getBytes(UTF_8).length; + totalMetricEntriesSize += metricAggregatorInfo.getField().getBytes(UTF_8).length; totalMetricEntriesSize += metricAggregatorInfo.getMetricStat().getTypeName().getBytes(UTF_8).length; } @@ -189,7 +189,7 @@ private static void writeMeta( // metric - metric stat pair for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { - String metricName = metricAggregatorInfo.getMetric(); + String metricName = metricAggregatorInfo.getField(); String metricStatName = metricAggregatorInfo.getMetricStat().getTypeName(); metaOut.writeString(metricName); metaOut.writeString(metricStatName); @@ -202,7 +202,7 @@ private static void writeMeta( metaOut.writeInt(starTreeField.getStarTreeConfig().maxLeafDocs()); // number of skip star node creation dimensions - metaOut.writeInt(starTreeField.getStarTreeConfig().maxLeafDocs()); + metaOut.writeInt(starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims().size()); // skip star node creations for (String dimension : starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims()) { diff --git a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java index 91313092d8d28..262937e165c8d 100644 --- a/server/src/main/java/org/opensearch/index/query/QueryShardContext.java +++ b/server/src/main/java/org/opensearch/index/query/QueryShardContext.java @@ -56,7 +56,11 @@ import org.opensearch.index.IndexSortConfig; import org.opensearch.index.analysis.IndexAnalyzers; import org.opensearch.index.cache.bitset.BitsetFilterCache; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; @@ -73,12 +77,17 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptFactory; import org.opensearch.script.ScriptService; +import org.opensearch.search.aggregations.AggregatorFactory; +import org.opensearch.search.aggregations.metrics.SumAggregatorFactory; import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import org.opensearch.transport.RemoteClusterAware; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -89,6 +98,7 @@ import java.util.function.LongSupplier; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -522,6 +532,66 @@ private ParsedQuery toQuery(QueryBuilder queryBuilder, CheckedFunction>> predicateMap = getStarTreePredicates(queryBuilder); + StarTreeQuery starTreeQuery = new StarTreeQuery(starTree, predicateMap, null); + OriginalOrStarTreeQuery originalOrStarTreeQuery = new OriginalOrStarTreeQuery(starTreeQuery, query); + return new ParsedQuery(originalOrStarTreeQuery); + } + + /** + * Parse query body to star-tree predicates + * @param queryBuilder + * @return + */ + private Map>> getStarTreePredicates(QueryBuilder queryBuilder) { + // Assuming the following variables have been initialized: + Map>> predicateMap = new HashMap<>(); + + // Check if the query builder is an instance of TermQueryBuilder + if (queryBuilder instanceof TermQueryBuilder) { + TermQueryBuilder tq = (TermQueryBuilder) queryBuilder; + String field = tq.fieldName(); + long inputQueryVal = Long.parseLong(tq.value().toString()); + + // Get or create the list of predicates for the given field + List> predicates = predicateMap.getOrDefault(field, new ArrayList<>()); + + // Create a predicate to match the input query value + Predicate predicate = dimVal -> dimVal == inputQueryVal; + predicates.add(predicate); + + // Put the predicates list back into the map + predicateMap.put(field, predicates); + } else { + throw new IllegalArgumentException("The query is not a term query"); + } + return predicateMap; + + } + + public boolean validateStarTreeMetricSuport(CompositeDataCubeFieldType compositeIndexFieldInfo, AggregatorFactory aggregatorFactory) { + String field = null; + Map> supportedMetrics = compositeIndexFieldInfo.getMetrics() + .stream() + .collect(Collectors.toMap(Metric::getField, Metric::getMetrics)); + + // Existing support only for MetricAggregators without sub-aggregations + if (aggregatorFactory.getSubFactories().getFactories().length != 0) { + return false; + } + + // TODO: increment supported aggregation type + if (aggregatorFactory instanceof SumAggregatorFactory) { + field = ((SumAggregatorFactory) aggregatorFactory).getField(); + if (supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(MetricStat.SUM)) { + return true; + } + } + + return false; + } + public Index index() { return indexSettings.getIndex(); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index a53a7198c366f..bfb1376f0ff02 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -77,12 +77,16 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.engine.Engine; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; import org.opensearch.index.mapper.DerivedFieldResolver; import org.opensearch.index.mapper.DerivedFieldResolverFactory; +import org.opensearch.index.mapper.StarTreeMapper; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; +import org.opensearch.index.query.ParsedQuery; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -97,11 +101,13 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.AggregationInitializationException; import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.AggregatorFactory; import org.opensearch.search.aggregations.InternalAggregation; import org.opensearch.search.aggregations.InternalAggregation.ReduceContext; import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; +import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; import org.opensearch.search.dfs.DfsPhase; @@ -1314,6 +1320,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc context.evaluateRequestShouldUseConcurrentSearch(); return; } + // Can be marked false for majority cases for which star-tree cannot be used + // Will save checking the criteria later and we can have a limit on what search requests are supported + // As we increment the cases where star-tree can be used, this can be set back to true + boolean canUseStarTree = context.mapperService().isCompositeIndexPresent(); + SearchShardTarget shardTarget = context.shardTarget(); QueryShardContext queryShardContext = context.getQueryShardContext(); context.from(source.from()); @@ -1339,9 +1350,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.sorts() != null) { try { Optional optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext()); - if (optionalSort.isPresent()) { - context.sort(optionalSort.get()); - } + optionalSort.ifPresent(context::sort); } catch (IOException e) { throw new SearchException(shardTarget, "failed to create sort elements", e); } @@ -1496,6 +1505,46 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.profile()) { context.setProfilers(new Profilers(context.searcher(), context.shouldUseConcurrentSearch())); } + + if (canUseStarTree) { + try { + setStarTreeQuery(context, queryShardContext, source); + logger.info("using star tree"); + } catch (IOException e) { + logger.info("not using star tree"); + } + } + } + + private boolean setStarTreeQuery(SearchContext context, QueryShardContext queryShardContext, SearchSourceBuilder source) + throws IOException { + + if (source.aggregations() == null) { + return false; + } + + // TODO: Support for multiple startrees + CompositeDataCubeFieldType compositeMappedFieldType = (StarTreeMapper.StarTreeFieldType) context.mapperService() + .getCompositeFieldTypes() + .iterator() + .next(); + CompositeIndexFieldInfo starTree = new CompositeIndexFieldInfo( + compositeMappedFieldType.name(), + compositeMappedFieldType.getCompositeIndexType() + ); + + ParsedQuery parsedQuery = queryShardContext.toStarTreeQuery(starTree, source.query(), context.query()); + AggregatorFactory aggregatorFactory = context.aggregations().factories().getFactories()[0]; + if (!(aggregatorFactory instanceof ValuesSourceAggregatorFactory + && aggregatorFactory.getSubFactories().getFactories().length == 0)) { + return false; + } + + if (queryShardContext.validateStarTreeMetricSuport(compositeMappedFieldType, aggregatorFactory)) { + context.parsedQuery(parsedQuery); + } + + return true; } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java index eeb0c606694b0..dfcb245ef3656 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactories.java @@ -255,7 +255,7 @@ public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories) { + public AggregatorFactories(AggregatorFactory[] factories) { this.factories = factories; } @@ -661,4 +661,8 @@ public PipelineTree buildPipelineTree() { return new PipelineTree(subTrees, aggregators); } } + + public AggregatorFactory[] getFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java index 6cc3a78fb1e36..86fbb46a9ad3c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorFactory.java @@ -127,4 +127,8 @@ protected boolean supportsConcurrentSegmentSearch() { public boolean evaluateChildFactories() { return factories.allFactoriesSupportConcurrentSearch(); } + + public AggregatorFactories getSubFactories() { + return factories; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java index f90e5a092385f..9802e3fdae50d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/NumericMetricsAggregator.java @@ -31,13 +31,20 @@ package org.opensearch.search.aggregations.metrics; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.Comparators; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.sort.SortOrder; import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; /** * Base class to aggregate all docs into a single numeric metric value. @@ -107,4 +114,14 @@ public BucketComparator bucketComparator(String key, SortOrder order) { return (lhs, rhs) -> Comparators.compareDiscardNaN(metric(key, lhs), metric(key, rhs), order == SortOrder.ASC); } } + + protected StarTreeValues getStarTreeValues(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException { + SegmentReader reader = Lucene.segmentReader(ctx.reader()); + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + StarTreeValues values = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + final AtomicReference aggrVal = new AtomicReference<>(null); + + return values; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..aa71224588c2c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -36,6 +36,8 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.DoubleArray; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.fielddata.SortedNumericDoubleValues; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; @@ -45,6 +47,8 @@ import org.opensearch.search.aggregations.support.ValuesSource; import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.startree.OriginalOrStarTreeQuery; +import org.opensearch.search.startree.StarTreeQuery; import java.io.IOException; import java.util.Map; @@ -56,13 +60,13 @@ */ public class SumAggregator extends NumericMetricsAggregator.SingleValue { - private final ValuesSource.Numeric valuesSource; - private final DocValueFormat format; + protected final ValuesSource.Numeric valuesSource; + protected final DocValueFormat format; - private DoubleArray sums; - private DoubleArray compensations; + protected DoubleArray sums; + protected DoubleArray compensations; - SumAggregator( + public SumAggregator( String name, ValuesSourceConfig valuesSourceConfig, SearchContext context, @@ -86,6 +90,14 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (context.query() instanceof OriginalOrStarTreeQuery && ((OriginalOrStarTreeQuery) context.query()).isStarTreeUsed()) { + StarTreeQuery starTreeQuery = ((OriginalOrStarTreeQuery) context.query()).getStarTreeQuery(); + return getStarTreeLeafCollector(ctx, sub, starTreeQuery.getStarTree()); + } + return getDefaultLeafCollector(ctx, sub); + } + + private LeafBucketCollector getDefaultLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } @@ -118,6 +130,28 @@ public void collect(int doc, long bucket) throws IOException { }; } + private LeafBucketCollector getStarTreeLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree) + throws IOException { + final BigArrays bigArrays = context.bigArrays(); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + + StarTreeValues starTreeValues = getStarTreeValues(ctx, starTree); + + // + String fieldName = ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(); + + return new LeafBucketCollectorBase(sub, starTreeValues) { + @Override + public void collect(int doc, long bucket) throws IOException { + // TODO: Fix the response for collecting star tree sum + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + }; + } + @Override public double metric(long owningBucketOrd) { if (valuesSource == null || owningBucketOrd >= sums.size()) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java index ef9b93920ba18..e0cd44f2672a8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregatorFactory.java @@ -52,7 +52,7 @@ * * @opensearch.internal */ -class SumAggregatorFactory extends ValuesSourceAggregatorFactory { +public class SumAggregatorFactory extends ValuesSourceAggregatorFactory { SumAggregatorFactory( String name, diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java index 1f4dd429e094e..5732d545cb2d2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSource.java @@ -625,6 +625,10 @@ public SortedNumericDocValues longValues(LeafReaderContext context) { public SortedNumericDoubleValues doubleValues(LeafReaderContext context) { return indexFieldData.load(context).getDoubleValues(); } + + public String getIndexFieldName() { + return indexFieldData.getFieldName(); + } } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java index 69a4a5d8b6703..b19e466b081f9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/support/ValuesSourceAggregatorFactory.java @@ -102,4 +102,16 @@ protected abstract Aggregator doCreateInternal( public String getStatsSubtype() { return config.valueSourceType().typeName(); } + + public String getField() { + return config.fieldContext().field(); + } + + public String getAggregationName() { + return name; + } + + public ValuesSourceConfig getConfig() { + return config; + } } diff --git a/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java new file mode 100644 index 0000000000000..bc8ef51bfb537 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/OriginalOrStarTreeQuery.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; + +import java.io.IOException; + +/** + * Preserves star-tree queries which can be used along with original query + * Decides which star-tree query to use (or not) based on cost factors + */ +public class OriginalOrStarTreeQuery extends Query implements Accountable { + + private final StarTreeQuery starTreeQuery; + private final Query originalQuery; + private boolean starTreeQueryUsed; + + public OriginalOrStarTreeQuery(StarTreeQuery starTreeQuery, Query originalQuery) { + this.starTreeQuery = starTreeQuery; + this.originalQuery = originalQuery; + this.starTreeQueryUsed = false; + } + + @Override + public String toString(String s) { + return ""; + } + + @Override + public void visit(QueryVisitor queryVisitor) { + + } + + @Override + public boolean equals(Object o) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + + public boolean isStarTreeUsed() { + return starTreeQueryUsed; + } + + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + if (searcher.getIndexReader().hasDeletions() == false) { + this.starTreeQueryUsed = true; + return this.starTreeQuery.createWeight(searcher, scoreMode, boost); + } else { + return this.originalQuery.createWeight(searcher, scoreMode, boost); + } + } + + public Query getOriginalQuery() { + return originalQuery; + } + + public StarTreeQuery getStarTreeQuery() { + return starTreeQuery; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java new file mode 100644 index 0000000000000..a3a000703a3b8 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeFilter.java @@ -0,0 +1,294 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.DocIdSetBuilder; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Predicate; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +// TODO: Code inspired from a POC - need to review this once +/** Filter operator for star tree data structure. */ +public class StarTreeFilter { + private static final Logger logger = LogManager.getLogger(StarTreeFilter.class); + + /** Helper class to wrap the result from traversing the star tree. */ + static class StarTreeResult { + final DocIdSetBuilder _matchedDocIds; + final Set _remainingPredicateColumns; + final int numOfMatchedDocs; + final int maxMatchedDoc; + + StarTreeResult(DocIdSetBuilder matchedDocIds, Set remainingPredicateColumns, int numOfMatchedDocs, int maxMatchedDoc) { + _matchedDocIds = matchedDocIds; + _remainingPredicateColumns = remainingPredicateColumns; + this.numOfMatchedDocs = numOfMatchedDocs; + this.maxMatchedDoc = maxMatchedDoc; + } + } + + private final StarTreeNode starTreeRoot; + + Map>> _predicateEvaluators; + // private final List _groupByColumns; + + DocIdSetBuilder docsWithField; + + DocIdSetBuilder.BulkAdder adder; + Map dimValueMap; + + public StarTreeFilter( + StarTreeValues starTreeAggrStructure, + Map>> predicateEvaluators, + List groupByColumns + ) { + // This filter operator does not support AND/OR/NOT operations. + starTreeRoot = starTreeAggrStructure.getRoot(); + dimValueMap = starTreeAggrStructure.getDimensionDocValuesIteratorMap(); + _predicateEvaluators = predicateEvaluators != null ? predicateEvaluators : Collections.emptyMap(); + // _groupByColumns = groupByColumns != null ? groupByColumns : Collections.emptyList(); + + // TODO : this should be the maximum number of doc values + docsWithField = new DocIdSetBuilder(Integer.MAX_VALUE); + } + + /** + *
    + *
  • First go over the star tree and try to match as many dimensions as possible + *
  • For the remaining columns, use doc values indexes to match them + *
+ */ + public DocIdSetIterator getStarTreeResult() throws IOException { + StarTreeResult starTreeResult = traverseStarTree(); + List andIterators = new ArrayList<>(); + andIterators.add(starTreeResult._matchedDocIds.build().iterator()); + DocIdSetIterator docIdSetIterator = andIterators.get(0); + // No matches, return + if (starTreeResult.maxMatchedDoc == -1) { + return docIdSetIterator; + } + int docCount = 0; + for (String remainingPredicateColumn : starTreeResult._remainingPredicateColumns) { + // TODO : set to max value of doc values + logger.info("remainingPredicateColumn : {}, maxMatchedDoc : {} ", remainingPredicateColumn, starTreeResult.maxMatchedDoc); + DocIdSetBuilder builder = new DocIdSetBuilder(starTreeResult.maxMatchedDoc + 1); + List> compositePredicateEvaluators = _predicateEvaluators.get(remainingPredicateColumn); + SortedNumericDocValues ndv = (SortedNumericDocValues) this.dimValueMap.get(remainingPredicateColumn); + List docIds = new ArrayList<>(); + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docCount++; + int docID = docIdSetIterator.docID(); + if (ndv.advanceExact(docID)) { + final int valuesCount = ndv.docValueCount(); + long value = ndv.nextValue(); + for (Predicate compositePredicateEvaluator : compositePredicateEvaluators) { + // TODO : this might be expensive as its done against all doc values docs + if (compositePredicateEvaluator.test(value)) { + docIds.add(docID); + for (int i = 0; i < valuesCount - 1; i++) { + while (docIdSetIterator.nextDoc() != NO_MORE_DOCS) { + docIds.add(docIdSetIterator.docID()); + } + } + break; + } + } + } + } + DocIdSetBuilder.BulkAdder adder = builder.grow(docIds.size()); + for (int docID : docIds) { + adder.add(docID); + } + docIdSetIterator = builder.build().iterator(); + } + return docIdSetIterator; + } + + /** + * Helper method to traverse the star tree, get matching documents and keep track of all the + * predicate dimensions that are not matched. + */ + private StarTreeResult traverseStarTree() throws IOException { + Set globalRemainingPredicateColumns = null; + + StarTreeNode starTree = starTreeRoot; + + List dimensionNames = new ArrayList<>(dimValueMap.keySet()); + + // Track whether we have found a leaf node added to the queue. If we have found a leaf node, and + // traversed to the + // level of the leave node, we can set globalRemainingPredicateColumns if not already set + // because we know the leaf + // node won't split further on other predicate columns. + boolean foundLeafNode = starTree.isLeaf(); + + // Use BFS to traverse the star tree + Queue queue = new ArrayDeque<>(); + queue.add(starTree); + int currentDimensionId = -1; + Set remainingPredicateColumns = new HashSet<>(_predicateEvaluators.keySet()); + // Set remainingGroupByColumns = new HashSet<>(_groupByColumns); + if (foundLeafNode) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + + int matchedDocsCountInStarTree = 0; + int maxDocNum = -1; + + StarTreeNode starTreeNode; + List docIds = new ArrayList<>(); + while ((starTreeNode = queue.poll()) != null) { + int dimensionId = starTreeNode.getDimensionId(); + if (dimensionId > currentDimensionId) { + // Previous level finished + String dimension = dimensionNames.get(dimensionId); + remainingPredicateColumns.remove(dimension); + // remainingGroupByColumns.remove(dimension); + if (foundLeafNode && globalRemainingPredicateColumns == null) { + globalRemainingPredicateColumns = new HashSet<>(remainingPredicateColumns); + } + currentDimensionId = dimensionId; + } + + // If all predicate columns columns are matched, we can use aggregated document + if (remainingPredicateColumns.isEmpty()) { + int docId = starTreeNode.getAggregatedDocId(); + docIds.add(docId); + matchedDocsCountInStarTree++; + maxDocNum = Math.max(docId, maxDocNum); + continue; + } + + // For leaf node, because we haven't exhausted all predicate columns and group-by columns, we + // cannot use + // the aggregated document. Add the range of documents for this node to the bitmap, and keep + // track of the + // remaining predicate columns for this node + if (starTreeNode.isLeaf()) { + for (long i = starTreeNode.getStartDocId(); i < starTreeNode.getEndDocId(); i++) { + docIds.add((int) i); + matchedDocsCountInStarTree++; + maxDocNum = Math.max((int) i, maxDocNum); + } + continue; + } + + // For non-leaf node, proceed to next level + String childDimension = dimensionNames.get(dimensionId + 1); + + // Only read star-node when the dimension is not in the global remaining predicate columns or + // group-by columns + // because we cannot use star-node in such cases + StarTreeNode starNode = null; + if ((globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension))) { + starNode = starTreeNode.getChildForDimensionValue(StarTreeNode.ALL); + } + + if (remainingPredicateColumns.contains(childDimension)) { + // Have predicates on the next level, add matching nodes to the queue + + // Calculate the matching dictionary ids for the child dimension + int numChildren = starTreeNode.getNumChildren(); + + // If number of matching dictionary ids is large, use scan instead of binary search + + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + + // When the star-node exists, and the number of matching doc ids is more than or equal to + // the + // number of non-star child nodes, check if all the child nodes match the predicate, and use + // the star-node if so + if (starNode != null) { + List matchingChildNodes = new ArrayList<>(); + boolean findLeafChildNode = false; + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + long val = childNode.getDimensionValue(); + if (predicate.test(val)) { + matchingChildNodes.add(childNode); + findLeafChildNode |= childNode.isLeaf(); + break; + } + } + } + if (matchingChildNodes.size() == numChildren - 1) { + // All the child nodes (except for the star-node) match the predicate, use the star-node + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Some child nodes do not match the predicate, use the matching child nodes + queue.addAll(matchingChildNodes); + foundLeafNode |= findLeafChildNode; + } + } else { + // Cannot use the star-node, use the matching child nodes + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + List> predicates = _predicateEvaluators.get(childDimension); + for (Predicate predicate : predicates) { + if (predicate.test(childNode.getDimensionValue())) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + break; + } + } + } + } + } else { + // No predicate on the next level + + if (starNode != null) { + // Star-node exists, use it + queue.add(starNode); + foundLeafNode |= starNode.isLeaf(); + } else { + // Star-node does not exist or cannot be used, add all non-star nodes to the queue + Iterator childrenIterator = starTreeNode.getChildrenIterator(); + while (childrenIterator.hasNext()) { + StarTreeNode childNode = childrenIterator.next(); + if (childNode.getDimensionValue() != StarTreeNode.ALL) { + queue.add(childNode); + foundLeafNode |= childNode.isLeaf(); + } + } + } + } + } + + adder = docsWithField.grow(docIds.size()); + for (int id : docIds) { + adder.add(id); + } + return new StarTreeResult( + docsWithField, + globalRemainingPredicateColumns != null ? globalRemainingPredicateColumns : Collections.emptySet(), + matchedDocsCountInStarTree, + maxDocNum + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java new file mode 100644 index 0000000000000..94b293335e3cc --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/StarTreeQuery.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.search.ConstantScoreScorer; +import org.apache.lucene.search.ConstantScoreWeight; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryVisitor; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; +import org.opensearch.index.codec.composite.CompositeIndexReader; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** Query class for querying star tree data structure */ +public class StarTreeQuery extends Query implements Accountable { + + /** + * Star tree field info + * This is used to get the star tree data structure + */ + CompositeIndexFieldInfo starTree; + + /** + * Map of field name to a list of predicates to be applied on that field + * This is used to filter the data based on the predicates + */ + Map>> compositePredicateMap; + + /** + * Set of field names to be used for grouping the results + * This is used to group the data based on the fields + */ + List groupByColumns; + + public StarTreeQuery( + CompositeIndexFieldInfo starTree, + Map>> compositePredicateMap, + List groupByColumns + ) { + this.starTree = starTree; + this.compositePredicateMap = compositePredicateMap; + this.groupByColumns = groupByColumns; + } + + @Override + public String toString(String field) { + return null; + } + + @Override + public void visit(QueryVisitor visitor) { + visitor.visitLeaf(this); + } + + @Override + public boolean equals(Object obj) { + return sameClassAs(obj); + } + + @Override + public int hashCode() { + return classHash(); + } + + @Override + public long ramBytesUsed() { + return 0; + } + + @Override + public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException { + return new ConstantScoreWeight(this, boost) { + @Override + public Scorer scorer(LeafReaderContext context) throws IOException { + SegmentReader reader = Lucene.segmentReader(context.reader()); + + // We get the 'CompositeIndexReader' instance so that we can get StarTreeValues + if (!(reader.getDocValuesReader() instanceof CompositeIndexReader)) return null; + + CompositeIndexReader starTreeDocValuesReader = (CompositeIndexReader) reader.getDocValuesReader(); + List compositeIndexFields = starTreeDocValuesReader.getCompositeIndexFields(); + StarTreeValues starTreeValues = null; + if (compositeIndexFields != null && !compositeIndexFields.isEmpty()) { + starTreeValues = (StarTreeValues) starTreeDocValuesReader.getCompositeIndexValues(starTree); + } else { + return null; + } + + StarTreeFilter filter = new StarTreeFilter(starTreeValues, compositePredicateMap, groupByColumns); + DocIdSetIterator result = filter.getStarTreeResult(); + return new ConstantScoreScorer(this, score(), scoreMode, result); + } + + @Override + public boolean isCacheable(LeafReaderContext ctx) { + return false; + } + }; + } + + public CompositeIndexFieldInfo getStarTree() { + return starTree; + } +} diff --git a/server/src/main/java/org/opensearch/search/startree/package-info.java b/server/src/main/java/org/opensearch/search/startree/package-info.java new file mode 100644 index 0000000000000..cb0802988f1f9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/startree/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.startree; diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index 8de583f5aefc5..b3b528eb55803 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.BaseStarTreeBuilder; @@ -55,6 +56,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -180,7 +182,11 @@ public static void setup() throws IOException { builder = new BaseStarTreeBuilder(metaOut, dataOut, starTreeField, writeState, mapperService) { @Override - public void build(List starTreeValuesSubs) throws IOException {} + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException {} @Override public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOException {} diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java index 7ccb9bcd91c30..1e655d7b3d455 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.DocValuesType; @@ -56,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -85,6 +87,7 @@ public class OnHeapStarTreeBuilderTests extends OpenSearchTestCase { private SegmentWriteState writeState; private IndexOutput dataOut; private IndexOutput metaOut; + private DocValuesConsumer docValuesConsumer; @Before public void setup() throws IOException { @@ -103,6 +106,7 @@ public void setup() throws IOException { ); DocValuesProducer docValuesProducer = mock(DocValuesProducer.class); + docValuesConsumer = mock(DocValuesConsumer.class); compositeField = new StarTreeField( "test", @@ -487,7 +491,7 @@ public void test_build_halfFloatMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(8, resultStarTreeDocuments.size()); @@ -552,7 +556,7 @@ public void test_build_floatMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(8, resultStarTreeDocuments.size()); @@ -606,7 +610,7 @@ public void test_build_longMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(8, resultStarTreeDocuments.size()); @@ -650,7 +654,7 @@ public void test_build() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(8, resultStarTreeDocuments.size());