Skip to content

Commit

Permalink
add parse regexp command for PPL
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Aug 7, 2024
1 parent a91b3ef commit 16d7cc0
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
}
}

protected def createPartitionedGrokEmailTable(testTable: String): Unit = {
spark.sql(
s"""
| CREATE TABLE $testTable
| (
| name STRING,
| email STRING,
| street_address STRING
| )
| USING $tableType $tableOptions
| PARTITIONED BY (
| year INT,
| month INT
| )
|""".stripMargin)

val data = Seq(
("Alice", "[email protected]", "123 Main St, Seattle", 2023, 4),
("Bob", "[email protected]", "456 Elm St, Portland", 2023, 5),
("Charlie", "[email protected]", "789 Pine St, San Francisco", 2023, 4),
("David", "[email protected]", "101 Maple St, New York", 2023, 5),
("Eve", "[email protected]", "202 Oak St, Boston", 2023, 4),
("Frank", "[email protected]", "303 Cedar St, Austin", 2023, 5),
("Grace", "[email protected]", "404 Birch St, Chicago", 2023, 4),
("Hank", "[email protected]", "505 Spruce St, Miami", 2023, 5),
("Ivy", "[email protected]", "606 Fir St, Denver", 2023, 4),
("Jack", "[email protected]", "707 Ash St, Seattle", 2023, 5)
)

data.foreach { case (name, email, street_address, year, month) =>
spark.sql(
s"""
| INSERT INTO $testTable
| PARTITION (year=$year, month=$month)
| VALUES ('$name', '$email', '$street_address')
| """.stripMargin)
}
}
protected def createPartitionedAddressTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

class FlintSparkPPLParseITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedGrokEmailTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test parse email expressions parsing") {
val frame = sql(s"""
| source = $testTable | parse email '.+@(?<host>.+)' | fields email, host ;
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row("Jake", 70), Row("Hello", 30), Row("John", 25), Row("Jane", 20))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val fieldsProjectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age"))
val evalProjectList = Seq(UnresolvedStar(None), Alias(Literal(1), "col")())
val expectedPlan = Project(fieldsProjectList, Project(evalProjectList, table))
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.RegExpExtract;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
import org.opensearch.sql.ast.tree.Relation;
Expand Down Expand Up @@ -224,6 +226,33 @@ private Expression visitExpression(UnresolvedExpression expression, CatalystPlan
return expressionAnalyzer.analyze(expression, context);
}

@Override
public LogicalPlan visitParse(Parse node, CatalystPlanContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<UnresolvedExpression> aliases = new ArrayList<>();
switch (node.getParseMethod()) {
case GROK:
throw new IllegalStateException("Not Supported operation : GROK");
case PATTERNS:
throw new IllegalStateException("Not Supported operation : PATTERNS");
case REGEX:
//todo
}
UnresolvedExpression sourceField = node.getSourceField();
Literal pattern = node.getPattern();
Alias alias = new Alias(sourceField.toString(), let.getExpression());
aliases.add(alias);
if (context.getNamedParseExpressions().isEmpty()) {
// Create an UnresolvedStar for all-fields projection
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
List<Expression> expressionList = visitExpressionList(aliases, context);
Seq<NamedExpression> projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
// build the plan with the projection step
child = context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p));
return child;
}

@Override
public LogicalPlan visitEval(Eval node, CatalystPlanContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
import org.opensearch.sql.ast.expression.ParseMethod;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.Span;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedArgument;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.utils.ArgumentFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

class PPLLogicalPlanParseTranslatorTestSuite
extends SparkFunSuite
with PlanTest
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test parse email & host expressions") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=t | parse email '.+@(?<host>.+)' | fields email, host", false),
context)
val evalProjectList: Seq[NamedExpression] =
Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")())
val expectedPlan = Project(
seq(UnresolvedAttribute("c")),
Project(evalProjectList, UnresolvedRelation(Seq("t"))))
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test parse email expression") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=t | parse email '.+@(?<email>.+)' | fields email", false),
context)
val evalProjectList: Seq[NamedExpression] =
Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")())
val expectedPlan = Project(
seq(UnresolvedAttribute("c")),
Project(evalProjectList, UnresolvedRelation(Seq("t"))))
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test parse email expression, generate new host field and eval result") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=t | parse email '.+@(?<host>.+)' | eval eval_result=1 | fields host, eval_result", false),
context)
val evalProjectList: Seq[NamedExpression] =
Seq(UnresolvedStar(None), Alias(Literal(1), "a")(), Alias(Literal(1), "b")())
val expectedPlan = Project(
seq(UnresolvedAttribute("c")),
Project(evalProjectList, UnresolvedRelation(Seq("t"))))
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}
}

0 comments on commit 16d7cc0

Please sign in to comment.