Skip to content

Commit

Permalink
Adding support for Rare & Top PPL
Browse files Browse the repository at this point in the history
top [N] <field-list> [by-clause]

N: number of results to return. Default: 10
field-list: mandatory. comma-delimited list of field names.
by-clause: optional. one or more fields to group the results by.
-------------------------------------------------------------------------------------------

rare <field-list> [by-clause]

field-list: mandatory. comma-delimited list of field names.
by-clause: optional. one or more fields to group the results by.
-------------------------------------------------------------------------------------------
commands:
 - opensearch-project#461
 - opensearch-project#536
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Aug 14, 2024
1 parent 4dde4ac commit 0128e2b
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,31 +184,32 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex
//add group by fields to context
context.getGroupingParseExpressions().addAll(groupExpList);
}

// set sort direction according to command type
List<SortDirection> sortDirections = new ArrayList<>();
if (node instanceof RareAggregation) {
sortDirections.add(Ascending$.MODULE$);
} else if(node instanceof TopAggregation) {
sortDirections.add(Descending$.MODULE$);
}

if (!sortExpList.isEmpty()) {
visitExpressionList(node.getSortExprList(), context);
Seq<SortOrder> sortElements = context.retainAllNamedParseExpressions(exp ->
new SortOrder((NamedExpression) exp, sortDirections.get(0) , sortDirections.get(0).defaultNullOrdering(), seq(new ArrayList<Expression>())));
context.apply(p -> (LogicalPlan) new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, p));
}

UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
span.accept(this, context);
//add span's group alias field (most recent added expression)
context.getGroupingParseExpressions().add(context.getNamedParseExpressions().peek());
}
// build the aggregation logical step
return extractedAggregation(context);
}
// context.apply(p -> extractedAggregation(context)); TODO remove
LogicalPlan logicalPlan = extractedAggregation(context);

// set sort direction according to command type (`rare` is Asc, `top` is Desc, default to Asc)
List<SortDirection> sortDirections = new ArrayList<>();
sortDirections.add(node instanceof RareAggregation ? Ascending$.MODULE$ : node instanceof TopAggregation ? Descending$.MODULE$ : Ascending$.MODULE$);

if (!sortExpList.isEmpty()) {
visitExpressionList(node.getSortExprList(), context);
Seq<SortOrder> sortElements = context.retainAllNamedParseExpressions(exp ->
new SortOrder(exp,
sortDirections.get(0),
sortDirections.get(0).defaultNullOrdering(),
seq(new ArrayList<Expression>())));
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, logicalPlan));
}
return logicalPlan;
}

private static LogicalPlan extractedAggregation(CatalystPlanContext context) {
Seq<Expression> groupingExpression = context.retainAllGroupingNamedParseExpressions(p -> p);
Expand Down

0 comments on commit 0128e2b

Please sign in to comment.