Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PPL Lookup #407

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.{QueryTest, Row}

class FlintSparkPPLLookupITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val lookupTable = "spark_catalog.default.flint_ppl_test_lookup"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedStateCountryTable(testTable)
createOccupationTable(lookupTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("create ppl simple query test") {
val frame = sql(s"""
| source = $testTable | where age > 20 | lookup flint_ppl_test_lookup name
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()

assert(results.length == 3)

// Define the expected results
val expectedResults: Array[Row] = Array(
Row("Jake", 70, "California", "USA", 2023, 4, "Jake", "Engineer", "England", 100000, 2023, 4),
Row("Hello", 30, "New York", "USA", 2023, 4, "Hello", "Artist", "USA", 70000, 2023, 4),
Row("John", 25, "Ontario", "Canada", 2023, 4, "John", "Doctor", "Canada", 120000, 2023, 4))
// Compare the results
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val expectedPlan: LogicalPlan =
Project(
Seq(UnresolvedStar(None)),
Join(
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")),
JoinType.apply("left"),
Option.empty,
JoinHint.NONE
)
//UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
)
// Compare the two plans
assert(expectedPlan === logicalPlan)
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ NUM: 'NUM';
// ARGUMENT KEYWORDS
KEEPEMPTY: 'KEEPEMPTY';
CONSECUTIVE: 'CONSECUTIVE';
APPENDONLY: 'APPENDONLY';
DEDUP_SPLITVALUES: 'DEDUP_SPLITVALUES';
PARTITIONS: 'PARTITIONS';
ALLNUM: 'ALLNUM';
Expand Down
14 changes: 14 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ commands
| renameCommand
| statsCommand
| dedupCommand
| lookupCommand
| sortCommand
| evalCommand
| headCommand
Expand Down Expand Up @@ -107,6 +108,18 @@ dedupCommand
: DEDUP (number = integerLiteral)? fieldList (KEEPEMPTY EQUAL keepempty = booleanLiteral)? (CONSECUTIVE EQUAL consecutive = booleanLiteral)?
;

matchFieldWithOptAs
: orignalMatchField = fieldExpression (AS asMatchField = fieldExpression)?
;

copyFieldWithOptAs
: orignalCopyField = fieldExpression (AS asCopyField = fieldExpression)?
;

lookupCommand
: LOOKUP tableSource matchFieldWithOptAs (COMMA matchFieldWithOptAs)* (APPENDONLY EQUAL appendonly = booleanLiteral)? (copyFieldWithOptAs (COMMA copyFieldWithOptAs)*)*
;

sortCommand
: SORT sortbyClause
;
Expand Down Expand Up @@ -848,6 +861,7 @@ keywordsCanBeId
| RENAME
| STATS
| DEDUP
| LOOKUP
| SORT
| EVAL
| HEAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -208,6 +209,10 @@ public T visitDedupe(Dedupe node, C context) {
return visitChildren(node, context);
}

public T visitLookup(Lookup node, C context) {
return visitChildren(node, context);
}

public T visitHead(Head node, C context) {
return visitChildren(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Map;

import java.util.List;

/** AST node represent Lookup operation. */

public class Lookup extends UnresolvedPlan {
private UnresolvedPlan child;
private final String tableName;
private final List<Map> matchFieldList;
private final List<Argument> options;
private final List<Alias> copyFieldList;

public Lookup(UnresolvedPlan child, String tableName, List<Map> matchFieldList, List<Argument> options, List<Alias> copyFieldList) {
this.child = child;
this.tableName = tableName;
this.matchFieldList = matchFieldList;
this.options = options;
this.copyFieldList = copyFieldList;
}

public Lookup(String tableName, List<Map> matchFieldList, List<Argument> options, List<Alias> copyFieldList) {
this.tableName = tableName;
this.matchFieldList = matchFieldList;
this.options = options;
this.copyFieldList = copyFieldList;
}

@Override
public Lookup attach(UnresolvedPlan child) {
this.child = child;
return this;
}

public String getTableName() {
return tableName;
}

public List<Map> getMatchFieldList() {
return matchFieldList;
}

public List<Argument> getOptions() {
return options;
}

public List<Alias> getCopyFieldList() {
return copyFieldList;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.JoinType;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.Join;
import org.apache.spark.sql.catalyst.plans.logical.JoinHint;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.AllFields;
Expand All @@ -32,6 +38,7 @@
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Map;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.QualifiedName;
Expand All @@ -49,16 +56,20 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.sparkproject.guava.collect.Iterables;
import scala.Option;
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -256,6 +267,93 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : dedupe ");
}

@Override
public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) {
Node root = node.getChild().get(0);

while(!root.getChild().isEmpty()) {
root = root.getChild().get(0);
}
Comment on lines +274 to +276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop finds the root command as lookup's source table, but we may have multiple lookup commands, such as source=t1 | lookup t2 | ... | lookup t3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source=<table> is used to specify the "source" table name.
It's not required to specify (repeat) the name of the "source" table within the lookup command, so I guess finding the root Relation node is OK. Am I missing something?


org.opensearch.sql.ast.tree.Relation source = (org.opensearch.sql.ast.tree.Relation) root;

node.getChild().get(0).accept(this, context);

//TODO: not sure how to implement appendonly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement appendonly, the output Project (parent of Join) should be

Project [other fields in source, CASE WHEN isnull('source.copyField) THEN coalesce('source.copyField, 'lookup.copyField) AS copyField ELSE 'lookup.copyField END AS copyField]
  +- Join LeftOuter

In CASE WHEN expression1 ELSE expression2, the expression1 is for appendonly (not performing when the source field already exist), the else is for overwriting existing field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it mean that we need to know list of fields available in the "source" table, in order to build project consisting of two lists:
a list of fields which we want to overwrite with values from the lookup table and a list of fields which should remain untouched?
I'm not sure whether it is possible to retrieve such list of available fields.

Boolean appendonly = (Boolean) node.getOptions().get(0).getValue().getValue();

LogicalPlan lookupRelation = new UnresolvedRelation(seq(of(node.getTableName())), CaseInsensitiveStringMap.empty(), false);
org.apache.spark.sql.catalyst.plans.logical.Project lookupProject;

List<NamedExpression> lookupRelationFields = buildLookupTableFieldList(node, context);
if (! lookupRelationFields.isEmpty()) {
lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(lookupRelationFields), lookupRelation);
} else {
lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(of(new UnresolvedStar(Option.empty()))), lookupRelation);
}

//TODO: use node.getCopyFieldList() to prefilter the right logical plan
//and return only the fields listed there. rename fields when requested

Expression joinCondition = buildLookupTableJoinCondition(node.getMatchFieldList(), source.getTableQualifiedName().toString(), node.getTableName(), context);

return context.apply(p -> new Join(

p, //original query (left)

lookupProject, //lookup query (right)

JoinType.apply("left"), //https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html

Option.apply(joinCondition), //which fields to join

JoinHint.NONE() //TODO: check, https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints-types
));
}

private List<NamedExpression> buildLookupTableFieldList(Lookup node, CatalystPlanContext context) {
if (node.getCopyFieldList().isEmpty()) {
return Collections.emptyList();
} else {
//todo should we also append fields used to match records - node.getMatchFieldList()?
List<NamedExpression> copyFields = node.getCopyFieldList().stream()
.map(copyField -> (NamedExpression) expressionAnalyzer.visitAlias(copyField, context))
.collect(Collectors.toList());
return copyFields;
}
}

private org.opensearch.sql.ast.expression.Field prefixField(List<String> prefixParts, UnresolvedExpression field) {
org.opensearch.sql.ast.expression.Field in = (org.opensearch.sql.ast.expression.Field) field;
org.opensearch.sql.ast.expression.QualifiedName inq = (org.opensearch.sql.ast.expression.QualifiedName) in.getField();
Iterable finalParts = Iterables.concat(prefixParts, inq.getParts());
return new org.opensearch.sql.ast.expression.Field(new org.opensearch.sql.ast.expression.QualifiedName(finalParts), in.getFieldArgs());
}

private Expression buildLookupTableJoinCondition(List<Map> fieldMap, String sourceTableName, String lookupTableName, CatalystPlanContext context) {
int size = fieldMap.size();

List<Expression> allEqlExpressions = new ArrayList<>(size);

for (Map map : fieldMap) {

//todo do we need to run prefixField? match fields are anyway handled as qualifiedName?
// Expression origin = visitExpression(prefixField(of(sourceTableName.split("\\.")),map.getOrigin()), context);
// Expression target = visitExpression(prefixField(of(lookupTableName.split("\\.")),map.getTarget()), context);
Expression origin = visitExpression(map.getOrigin(), context);
Expression target = visitExpression(map.getTarget(), context);


//important
context.retainAllNamedParseExpressions(e -> e);

Expression eql = new EqualTo(origin, target);
allEqlExpressions.add(eql);
}

return allEqlExpressions.stream().reduce(org.apache.spark.sql.catalyst.expressions.And::new).orElse(null);
}

/**
* Expression Analyzer.
*/
Expand Down
Loading
Loading