Skip to content

Commit

Permalink
Translate PPL dedup Command Part 2: allowedDuplication>1 (#543)
Browse files Browse the repository at this point in the history
* Translate PPL  Command Part 2:  allowedDuplication>1

Signed-off-by: Lantao Jin <[email protected]>

* update document

Signed-off-by: Lantao Jin <[email protected]>

* remove the CatalystQueryPlanVisitor.java.orig

Signed-off-by: Lantao Jin <[email protected]>

* refactor

Signed-off-by: Lantao Jin <[email protected]>

* add IT for reordering field list

Signed-off-by: Lantao Jin <[email protected]>

* remove useless code

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
Co-authored-by: YANGDB <[email protected]>
  • Loading branch information
LantaoJin and YANG-DB authored Aug 30, 2024
1 parent eb2bbb6 commit cc52b1c
Show file tree
Hide file tree
Showing 6 changed files with 527 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, IsNotNull, IsNull, Or}
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Filter, LogicalPlan, Project, Union}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLDedupITSuite
class FlintSparkPPLDedupeITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
Expand Down Expand Up @@ -187,7 +187,7 @@ class FlintSparkPPLDedupITSuite
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

ignore("test dedupe 2 name") {
test("test dedupe 2 name") {
val frame = sql(s"""
| source = $testTable| dedup 2 name | fields name
| """.stripMargin)
Expand All @@ -200,7 +200,7 @@ class FlintSparkPPLDedupITSuite
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name, category") {
test("test dedupe 2 name, category") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category | fields name, category
| """.stripMargin)
Expand All @@ -225,7 +225,7 @@ class FlintSparkPPLDedupITSuite
assert(results.sorted.sameElements(expectedResults.sorted))
}

ignore("test dedupe 2 name KEEPEMPTY=true") {
test("test dedupe 2 name KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name KEEPEMPTY=true | fields name, category
| """.stripMargin)
Expand Down Expand Up @@ -259,7 +259,7 @@ class FlintSparkPPLDedupITSuite
.sameElements(expectedResults.sorted.map(_.getAs[String](0))))
}

ignore("test dedupe 2 name, category KEEPEMPTY=true") {
test("test dedupe 2 name, category KEEPEMPTY=true") {
val frame = sql(s"""
| source = $testTable| dedup 2 name, category KEEPEMPTY=true | fields name, category
| """.stripMargin)
Expand Down Expand Up @@ -307,4 +307,140 @@ class FlintSparkPPLDedupITSuite
| """.stripMargin))
assert(ex.getMessage.contains("Consecutive deduplication is not supported"))
}

test("test dedupe 1 category, name - reorder field list won't impact output order") {
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"))
implicit val twoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => (row.getAs(0), row.getAs(1)))

val frame1 = sql(s"""
| source = $testTable | dedup 1 name, category
| """.stripMargin)
val results1: Array[Row] = frame1.drop("id").collect()

val frame2 = sql(s"""
| source = $testTable | dedup 1 category, name
| """.stripMargin)
val results2: Array[Row] = frame2.drop("id").collect()

assert(results1.sorted.sameElements(results2.sorted))
assert(results1.sorted.sameElements(expectedResults.sorted))
}

test(
"test dedupe 1 category, name KEEPEMPTY=true - reorder field list won't impact output order") {
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "Y"),
Row("B", "Z"),
Row("C", "X"),
Row("D", "Z"),
Row("B", "Y"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})

val frame1 = sql(s"""
| source = $testTable | dedup 1 name, category KEEPEMPTY=true
| """.stripMargin)
val results1: Array[Row] = frame1.drop("id").collect()

val frame2 = sql(s"""
| source = $testTable | dedup 1 category, name KEEPEMPTY=true
| """.stripMargin)
val results2: Array[Row] = frame2.drop("id").collect()

assert(results1.sorted.sameElements(results2.sorted))
assert(results1.sorted.sameElements(expectedResults.sorted))
}

test("test dedupe 2 category, name - reorder field list won't impact output order") {
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"))
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](row => {
val value = row.getAs[String](0)
if (value == null) String.valueOf(Int.MaxValue) else value
})

val frame1 = sql(s"""
| source = $testTable | dedup 2 name, category
| """.stripMargin)
val results1: Array[Row] = frame1.drop("id").collect()

val frame2 = sql(s"""
| source = $testTable | dedup 2 category, name
| """.stripMargin)
val results2: Array[Row] = frame2.drop("id").collect()

assert(results1.sorted.sameElements(results2.sorted))
assert(results1.sorted.sameElements(expectedResults.sorted))
}

test(
"test dedupe 2 category, name KEEPEMPTY=true - reorder field list won't impact output order") {
val expectedResults: Array[Row] = Array(
Row("A", "X"),
Row("A", "X"),
Row("A", "Y"),
Row("A", "Y"),
Row("B", "Y"),
Row("B", "Z"),
Row("B", "Z"),
Row("C", "X"),
Row("C", "X"),
Row("D", "Z"),
Row(null, "Y"),
Row("E", null),
Row(null, "X"),
Row("B", null),
Row(null, "Z"),
Row(null, null))
implicit val nullableTwoColsRowOrdering: Ordering[Row] =
Ordering.by[Row, (String, String)](row => {
val value0 = row.getAs[String](0)
val value1 = row.getAs[String](1)
(
if (value0 == null) String.valueOf(Int.MaxValue) else value0,
if (value1 == null) String.valueOf(Int.MaxValue) else value1)
})

val frame1 = sql(s"""
| source = $testTable | dedup 2 name, category KEEPEMPTY=true
| """.stripMargin)
val results1: Array[Row] = frame1.drop("id").collect()

val frame2 = sql(s"""
| source = $testTable | dedup 2 category, name KEEPEMPTY=true
| """.stripMargin)
val results2: Array[Row] = frame2.drop("id").collect()

assert(results1.sorted.sameElements(results2.sorted))
assert(results1.sorted.sameElements(expectedResults.sorted))
}
}
7 changes: 5 additions & 2 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,11 @@ Limitation: Overriding existing field is unsupported, following queries throw ex
- `source = table | dedup 1 a,b | fields a,b,c`
- `source = table | dedup 1 a keepempty=true | fields a,b,c`
- `source = table | dedup 1 a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Unsupported)
- `source = table | dedup 2 a | fields a,b,c` (Unsupported)
- `source = table | dedup 2 a | fields a,b,c`
- `source = table | dedup 2 a,b | fields a,b,c`
- `source = table | dedup 2 a keepempty=true | fields a,b,c`
- `source = table | dedup 2 a,b keepempty=true | fields a,b,c`
- `source = table | dedup 1 a consecutive=true| fields a,b,c` (Consecutive deduplication is unsupported)

**Rare**
- `source=accounts | rare gender`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import org.apache.spark.sql.catalyst.expressions.SortDirection;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand Down Expand Up @@ -88,6 +86,10 @@
import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEvent;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEventAndKeepEmpty;
import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;
Expand Down Expand Up @@ -350,97 +352,29 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {
}
visitFieldList(node.getFields(), context);
// Columns to deduplicate
Seq<org.apache.spark.sql.catalyst.expressions.Attribute> dedupFields
Seq<org.apache.spark.sql.catalyst.expressions.Attribute> dedupeFields
= context.retainAllNamedParseExpressions(e -> (org.apache.spark.sql.catalyst.expressions.Attribute) e);
// Although we can also use the Window operator to translate this as allowedDuplication > 1 did,
// adding Aggregate operator could achieve better performance.
if (allowedDuplication == 1) {
if (keepEmpty) {
// Union
// :- Deduplicate ['a, 'b]
// : +- Filter (isnotnull('a) AND isnotnull('b)
// : +- Project
// : +- UnresolvedRelation
// +- Filter (isnull('a) OR isnull('a))
// +- Project
// +- UnresolvedRelation

context.apply(p -> {
Expression isNullExpr = buildIsNullFilterExpression(node, context);
LogicalPlan right = new org.apache.spark.sql.catalyst.plans.logical.Filter(isNullExpr, p);

Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context);
LogicalPlan left =
new Deduplicate(dedupFields,
new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p));
return new Union(seq(left, right), false, false);
});
return context.getPlan();
return retainOneDuplicateEventAndKeepEmpty(node, dedupeFields, expressionAnalyzer, context);
} else {
// Deduplicate ['a, 'b]
// +- Filter (isnotnull('a) AND isnotnull('b))
// +- Project
// +- UnresolvedRelation

Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context);
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p));
// Todo DeduplicateWithinWatermark in streaming dataset?
return context.apply(p -> new Deduplicate(dedupFields, p));
return retainOneDuplicateEvent(node, dedupeFields, expressionAnalyzer, context);
}
} else {
// TODO
throw new UnsupportedOperationException("Number of duplicate events greater than 1 is not supported");
if (keepEmpty) {
return retainMultipleDuplicateEventsAndKeepEmpty(node, allowedDuplication, expressionAnalyzer, context);
} else {
return retainMultipleDuplicateEvents(node, allowedDuplication, expressionAnalyzer, context);
}
}
}

private Expression buildIsNotNullFilterExpression(Dedupe node, CatalystPlanContext context) {
visitFieldList(node.getFields(), context);
Seq<Expression> isNotNullExpressions =
context.retainAllNamedParseExpressions(
org.apache.spark.sql.catalyst.expressions.IsNotNull$.MODULE$::apply);

Expression isNotNullExpr;
if (isNotNullExpressions.size() == 1) {
isNotNullExpr = isNotNullExpressions.apply(0);
} else {
isNotNullExpr = isNotNullExpressions.reduce(
new scala.Function2<Expression, Expression, Expression>() {
@Override
public Expression apply(Expression e1, Expression e2) {
return new org.apache.spark.sql.catalyst.expressions.And(e1, e2);
}
}
);
}
return isNotNullExpr;
}

private Expression buildIsNullFilterExpression(Dedupe node, CatalystPlanContext context) {
visitFieldList(node.getFields(), context);
Seq<Expression> isNullExpressions =
context.retainAllNamedParseExpressions(
org.apache.spark.sql.catalyst.expressions.IsNull$.MODULE$::apply);

Expression isNullExpr;
if (isNullExpressions.size() == 1) {
isNullExpr = isNullExpressions.apply(0);
} else {
isNullExpr = isNullExpressions.reduce(
new scala.Function2<Expression, Expression, Expression>() {
@Override
public Expression apply(Expression e1, Expression e2) {
return new org.apache.spark.sql.catalyst.expressions.Or(e1, e2);
}
}
);
}
return isNullExpr;
}

/**
* Expression Analyzer.
*/
private static class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {
public static class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {

public Expression analyze(UnresolvedExpression unresolved, CatalystPlanContext context) {
return unresolved.accept(this, context);
Expand Down
Loading

0 comments on commit cc52b1c

Please sign in to comment.