Skip to content

Commit

Permalink
fix according to comments
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Aug 28, 2024
1 parent 9d33117 commit e198554
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class FlintSparkPPLGrokITSuite
// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
// Define the expected results
val expectedResults: Array[Row] = Array(
Row("[email protected]", "domain.net"),
Row("[email protected]", "anotherdomain.com"),
Expand All @@ -67,13 +66,11 @@ class FlintSparkPPLGrokITSuite
val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal("1")))),
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal("1")),
"host")()
val expectedPlan = Project(
Seq(emailAttribute, hostAttribute),
Expand Down Expand Up @@ -105,13 +102,11 @@ class FlintSparkPPLGrokITSuite
val emailAttribute = UnresolvedAttribute("email")
val ageAttribute = UnresolvedAttribute("age")
val hostExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)))),
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)),
"host")()

// Define the corrected expected plan
Expand Down Expand Up @@ -156,13 +151,11 @@ class FlintSparkPPLGrokITSuite
val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)))),
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)),
"host")()

// Define the corrected expected plan
Expand Down Expand Up @@ -201,13 +194,11 @@ class FlintSparkPPLGrokITSuite
val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)))),
RegExpExtract(
emailAttribute,
Literal(
".+@(?<name0>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"),
Literal(1)),
"host")()

val sortedPlan = Sort(
Expand Down Expand Up @@ -270,21 +261,20 @@ class FlintSparkPPLGrokITSuite
val timestampAttribute = UnresolvedAttribute("timestamp")
val responseAttribute = UnresolvedAttribute("response")
val bytesAttribute = UnresolvedAttribute("bytes")
// scalastyle:off
val expectedRegExp =
"(?<name0>(?<name1>(?:(?<name2>\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))|(?<name3>(?:(?<name4>((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:)))(%.+)?)|(?<name5>(?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])))))) (?<name6>(?<name7>[a-zA-Z0-9._-]+)) (?<name8>(?<name9>[a-zA-Z0-9._-]+)) \\[(?<name10>(?<name11>(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]))/(?<name12>\\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\\b)/(?<name13>(?>\\d\\d){1,2}):(?<name14>(?!<[0-9])(?<name15>(?:2[0123]|[01]?[0-9])):(?<name16>(?:[0-5][0-9]))(?::(?<name17>(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)))(?![0-9])) (?<name18>(?:[+-]?(?:[0-9]+))))\\] \"(?:(?<name19>\\b\\w+\\b) (?<name20>\\S+)(?: HTTP/(?<name21>(?:(?<name22>(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))))?|(?<name23>.*?))\" (?<name24>(?:(?<name25>(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?:(?<name26>(?:(?<name27>(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+))))))|-))"
// scalastyle:on

val COMMONAPACHELOG = Alias(
Coalesce(Seq(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("1")))),
RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("1")),
"COMMONAPACHELOG")()
val timestamp = Alias(
Coalesce(Seq(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("5")))),
"timestamp")()
val response = Alias(
Coalesce(Seq(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("18")))),
"response")()
val bytes = Alias(
Coalesce(Seq(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("19")))),
"bytes")()
val timestamp =
Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("5")), "timestamp")()
val response =
Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("18")), "response")()
val bytes =
Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("19")), "bytes")()
val expectedPlan = Project(
Seq(logAttribute, timestampAttribute, responseAttribute, bytesAttribute),
Project(
Expand Down Expand Up @@ -322,13 +312,11 @@ class FlintSparkPPLGrokITSuite
val street_addressAttribute = UnresolvedAttribute("street_address")
val addressAttribute = UnresolvedAttribute("address")
val addressExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
street_addressAttribute,
Literal(
"(?<name0>(?:(?<name1>(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?<name2>.*)"),
Literal("3")))),
RegExpExtract(
street_addressAttribute,
Literal(
"(?<name0>(?:(?<name1>(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?<name2>.*)"),
Literal("3")),
"address")()
val expectedPlan = Project(
Seq(addressAttribute),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ class FlintSparkPPLParseITSuite
// Define the expected logical plan
val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal("1")))),
"host")()
val hostExpression =
Alias(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal("1")), "host")()
val expectedPlan = Project(
Seq(emailAttribute, hostAttribute),
Project(
Expand Down Expand Up @@ -102,9 +101,8 @@ class FlintSparkPPLParseITSuite
// Define the expected logical plan
val emailAttribute = UnresolvedAttribute("email")
val ageAttribute = UnresolvedAttribute("age")
val hostExpression = Alias(
Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)))),
"host")()
val hostExpression =
Alias(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)), "host")()

// Define the corrected expected plan
val expectedPlan = Project(
Expand Down Expand Up @@ -147,9 +145,8 @@ class FlintSparkPPLParseITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)))),
"host")()
val hostExpression =
Alias(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)), "host")()

// Define the corrected expected plan
val expectedPlan = Project(
Expand Down Expand Up @@ -186,9 +183,8 @@ class FlintSparkPPLParseITSuite

val emailAttribute = UnresolvedAttribute("email")
val hostAttribute = UnresolvedAttribute("host")
val hostExpression = Alias(
Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)))),
"host")()
val hostExpression =
Alias(RegExpExtract(emailAttribute, Literal(".+@(?<host>.+)"), Literal(1)), "host")()

val sortedPlan = Sort(
Seq(
Expand Down Expand Up @@ -242,21 +238,17 @@ class FlintSparkPPLParseITSuite
val streetAttribute = UnresolvedAttribute("street")

val streetNumberExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")))),
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("1")),
"streetNumber")()

val streetExpression = Alias(
Coalesce(
Seq(
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")))),
RegExpExtract(
addressAttribute,
Literal("(?<streetNumber>\\d+) (?<street>.+)"),
Literal("2")),
"street")()

val expectedPlan = Project(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,21 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.AttributeReference;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
import org.apache.spark.sql.catalyst.expressions.Coalesce;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.RegExpExtract;
import org.apache.spark.sql.catalyst.expressions.SortDirection;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.expressions.StringRegexExpression;
import org.apache.spark.sql.catalyst.plans.logical.Aggregate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate;
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
Expand Down Expand Up @@ -76,25 +71,20 @@
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.ParseStrategy;
import org.opensearch.sql.ppl.utils.ParseUtils;
import org.opensearch.sql.ppl.utils.SortUtils;
import scala.Option;
import scala.Option$;
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.List.of;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Coalesce;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.RegExpExtract;
Expand All @@ -22,11 +21,10 @@
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.ParseUtils.GrokExpression.getNamedGroupIndex;

public interface ParseStrategy {
/**
* transform the parse/grok/patterns command into a standard catalyst RegExpExtract expression wrapped by a Coalesce to handle potential null values
* transform the parse/grok/patterns command into a standard catalyst RegExpExtract expression
* Since spark's RegExpExtract cant accept actual regExp group name we need to translate the group's name into its corresponding index
*
* @param node
Expand Down Expand Up @@ -64,11 +62,9 @@ static LogicalPlan visitParseCommand(Parse node, Expression sourceField, ParseMe
RegExpExtract regExpExtract = new RegExpExtract(sourceField,
org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType),
org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType));
//next create Coalesce to handle potential null values
Coalesce coalesce = new Coalesce(seq(regExpExtract));
//next Alias the extracted fields
context.getNamedParseExpressions().push(
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(coalesce,
org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExpExtract,
group,
NamedExpression.newExprId(),
seq(new java.util.ArrayList<String>()),
Expand Down
Loading

0 comments on commit e198554

Please sign in to comment.