Skip to content

Commit

Permalink
Re-implement distinct operators
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Dec 25, 2024
1 parent 141655c commit 57c1a1d
Show file tree
Hide file tree
Showing 66 changed files with 4,905 additions and 3,088 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
import org.apache.pinot.core.query.aggregation.utils.exprminmax.ExprMinMaxObject;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.segment.local.customobject.AvgPair;
Expand Down Expand Up @@ -125,7 +124,7 @@ public enum ObjectType {
Map(8),
IntSet(9),
TDigest(10),
DistinctTable(11),
// DistinctTable(11),
DataSketch(12),
Geometry(13),
RoaringBitmap(14),
Expand Down Expand Up @@ -227,8 +226,6 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.IntSet;
} else if (value instanceof TDigest) {
return ObjectType.TDigest;
} else if (value instanceof DistinctTable) {
return ObjectType.DistinctTable;
} else if (value instanceof Sketch) {
return ObjectType.DataSketch;
} else if (value instanceof KllDoublesSketch) {
Expand Down Expand Up @@ -797,36 +794,6 @@ public HyperLogLogPlus deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {

@Override
public byte[] serialize(DistinctTable distinctTable) {
try {
return distinctTable.toBytes();
} catch (IOException e) {
throw new IllegalStateException("Caught exception while serializing DistinctTable", e);
}
}

@Override
public DistinctTable deserialize(byte[] bytes) {
try {
return DistinctTable.fromByteBuffer(ByteBuffer.wrap(bytes));
} catch (IOException e) {
throw new IllegalStateException("Caught exception while de-serializing DistinctTable", e);
}
}

@Override
public DistinctTable deserialize(ByteBuffer byteBuffer) {
try {
return DistinctTable.fromByteBuffer(byteBuffer);
} catch (IOException e) {
throw new IllegalStateException("Caught exception while de-serializing DistinctTable", e);
}
}
};

public static final ObjectSerDe<QuantileDigest> QUANTILE_DIGEST_SER_DE = new ObjectSerDe<QuantileDigest>() {

@Override
Expand Down Expand Up @@ -1794,7 +1761,7 @@ public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
MAP_SER_DE,
INT_SET_SER_DE,
TDIGEST_SER_DE,
DISTINCT_TABLE_SER_DE,
null, // Deprecate DISTINCT_TABLE_SER_DE
DATA_SKETCH_THETA_SER_DE,
GEOMETRY_SER_DE,
ROARING_BITMAP_SER_DE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
package org.apache.pinot.core.operator.blocks.results;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.distinct.table.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;


/**
Expand Down Expand Up @@ -68,18 +64,12 @@ public DataSchema getDataSchema() {

@Override
public List<Object[]> getRows() {
List<Object[]> rows = new ArrayList<>(_distinctTable.size());
for (Record record : _distinctTable.getRecords()) {
rows.add(record.getValues());
}
return rows;
return _distinctTable.getRows();
}

@Override
public DataTable getDataTable()
throws IOException {
Collection<Object[]> rows = getRows();
return SelectionOperatorUtils.getDataTableFromRows(rows, _distinctTable.getDataSchema(),
_queryContext.isNullHandlingEnabled());
return _distinctTable.toDataTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.apache.pinot.core.operator.blocks.TimeSeriesBuilderBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.distinct.table.DistinctTable;
import org.apache.pinot.core.query.distinct.table.EmptyDistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;

Expand Down Expand Up @@ -119,8 +120,9 @@ private static DistinctResultsBlock buildEmptyDistinctQueryResults(QueryContext
ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
// NOTE: Use STRING column data type as default for distinct query
Arrays.fill(columnDataTypes, ColumnDataType.STRING);
DistinctTable distinctTable = new DistinctTable(new DataSchema(columns, columnDataTypes), Collections.emptySet(),
queryContext.isNullHandlingEnabled());
DistinctTable distinctTable =
new EmptyDistinctTable(new DataSchema(columns, columnDataTypes), queryContext.getLimit(),
queryContext.isNullHandlingEnabled());
return new DistinctResultsBlock(distinctTable, queryContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class DistinctCombineOperator extends BaseSingleBlockCombineOperator<Dist
private static final String EXPLAIN_NAME = "COMBINE_DISTINCT";

public DistinctCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService) {
super(new DistinctResultsBlockMerger(queryContext), operators, queryContext, executorService);
super(new DistinctResultsBlockMerger(), operators, queryContext, executorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.pinot.core.operator.combine;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -212,21 +210,12 @@ protected void processSegments() {
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
Collection<Object[]> rows = resultsBlock.getRows();
if (rows != null && rows.size() >= _numRowsToKeep) {
List<Object[]> rows = resultsBlock.getRows();
assert rows != null;
int numRows = rows.size();
if (numRows >= _numRowsToKeep) {
// Segment result has enough rows, update the boundary value

Comparable segmentBoundaryValue;
if (rows instanceof PriorityQueue) {
// Results from SelectionOrderByOperator
assert ((PriorityQueue<Object[]>) rows).peek() != null;
segmentBoundaryValue = (Comparable) ((PriorityQueue<Object[]>) rows).peek()[0];
} else {
// Results from LinearSelectionOrderByOperator
assert rows instanceof List;
segmentBoundaryValue = (Comparable) ((List<Object[]>) rows).get(rows.size() - 1)[0];
}

Comparable segmentBoundaryValue = (Comparable) rows.get(numRows - 1)[0];
if (boundaryValue == null) {
boundaryValue = segmentBoundaryValue;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,17 @@
package org.apache.pinot.core.operator.combine.merger;

import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.request.context.QueryContext;


public class DistinctResultsBlockMerger implements ResultsBlockMerger<DistinctResultsBlock> {
private final QueryContext _queryContext;
private final boolean _hasOrderBy;

public DistinctResultsBlockMerger(QueryContext queryContext) {
_queryContext = queryContext;
_hasOrderBy = queryContext.getOrderByExpressions() != null;
}

@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
if (_hasOrderBy) {
return false;
}
return resultsBlock.getDistinctTable().size() >= _queryContext.getLimit();
return resultsBlock.getDistinctTable().isSatisfied();
}

@Override
public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, DistinctResultsBlock blockToMerge) {
DistinctTable mergedDistinctTable = mergedBlock.getDistinctTable();
DistinctTable distinctTableToMerge = blockToMerge.getDistinctTable();
assert mergedDistinctTable != null && distinctTableToMerge != null;

// Convert the merged table into a main table if necessary in order to merge other tables
if (!mergedDistinctTable.isMainTable()) {
DistinctTable mainDistinctTable =
new DistinctTable(distinctTableToMerge.getDataSchema(), _queryContext.getOrderByExpressions(),
_queryContext.getLimit(), _queryContext.isNullHandlingEnabled());
mainDistinctTable.mergeTable(mergedDistinctTable);
mergedBlock.setDistinctTable(mainDistinctTable);
mergedDistinctTable = mainDistinctTable;
}

mergedDistinctTable.mergeTable(distinctTableToMerge);
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public SelectionOnlyResultsBlockMerger(QueryContext queryContext) {

@Override
public boolean isQuerySatisfied(SelectionResultsBlock resultsBlock) {
return resultsBlock.getRows().size() >= _numRowsToKeep;
return resultsBlock.getNumRows() >= _numRowsToKeep;
}

@Override
Expand Down
Loading

0 comments on commit 57c1a1d

Please sign in to comment.