Skip to content

Commit

Permalink
Merge pull request opensearch-project#56 from YANG-DB/add_optimzer_su…
Browse files Browse the repository at this point in the history
…pport_and_mvn_publish

Add  ppl-spark-extension optimzer support & mvn publish
  • Loading branch information
vamsi-amazon authored Oct 4, 2023
2 parents 654cb6d + f124d60 commit 6d87b81
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 63 deletions.
1 change: 1 addition & 0 deletions .github/workflows/snapshot-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
- name: Publish to Local Maven
run: |
sbt standaloneCosmetic/publishM2
sbt sparkPPLCosmetic/publishM2
sbt sparkSqlApplicationCosmetic/publishM2
- uses: actions/checkout@v3
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ To use PPL to Spark translation, you can run Spark with PPL extension:
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Extension
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

## Build

To build and run this application with Spark, you can run:
Expand Down
59 changes: 59 additions & 0 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Running PPL On Spark Reference Manual

## Overview

This module provides the support for running [PPL](https://github.com/opensearch-project/piped-processing-language) queries on Spark using direct logical plan
translation between PPL's logical plan to Spark's Catalyst logical plan.

### What is PPL ?
OpenSearch PPL, or Pipe Processing Language, is a query language used with the OpenSearch platform and now Apache Spark.
PPL allows users to retrieve, query, and analyze data by using commands that are piped together, making it easier to understand and compose complex queries.
Its syntax is inspired by Unix pipes, which enables chaining of commands to transform and process data.
With PPL, users can filter, aggregate, and visualize data in multiple datasources in a more intuitive manner compared to traditional query languages

### Context

The next concepts are the main purpose of introduction this functionality:
- Transforming PPL to become OpenSearch default query language (specifically for logs/traces/metrics signals)
- Promoting PPL as a viable candidate for the proposed CNCF Observability universal query language.
- Seamlessly Interact with different datasources such as S3 / Prometheus / data-lake leveraging spark execution.
- Using spark's federative capabilities as a general purpose query engine to facilitate complex queries including joins
- Improve and promote PPL to become extensible and general purpose query language to be adopted by the community


### Running PPL Commands:

In order to run PPL commands, you will need to perform the following tasks:

#### PPL Build & Run

To build and run this PPL in Spark, you can run:

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT"
```

### PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.1.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.1.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
```

Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query.
In addition, PPL queries will enjoy the acceleration capabilities supported by the Flint plugins as described [here](index.md)

Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.{FlintPPLSparkExtensions, FlintSparkExtensions}

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintSparkConf.OPTIMIZER_RULE_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

trait FlintPPLSuite extends SharedSparkSession {
override protected def sparkConf = {
override protected def sparkConf: SparkConf = {
val conf = new SparkConf()
.set("spark.ui.enabled", "false")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
Expand All @@ -21,7 +24,12 @@ trait FlintPPLSuite extends SharedSparkSession {
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set("spark.sql.extensions", classOf[FlintPPLSparkExtensions].getName)
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set(
"spark.sql.extensions",
List(classOf[FlintPPLSparkExtensions].getName, classOf[FlintSparkExtensions].getName)
.mkString(", "))
.set(OPTIMIZER_RULE_ENABLED.key, "false")
conf
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder}
Expand All @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "default.flint_ppl_test"
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -92,7 +90,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("COUNT"), Seq(ageField), isDistinct = false), "count(age)")()
Expand Down Expand Up @@ -132,7 +130,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()
Expand Down Expand Up @@ -161,7 +159,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")()
Expand Down Expand Up @@ -203,7 +201,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down Expand Up @@ -239,7 +237,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down Expand Up @@ -272,7 +270,7 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val countryField = UnresolvedAttribute("country")
val countryAlias = Alias(countryField, "country")()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder}
Expand All @@ -20,7 +18,7 @@ class FlintSparkPPLAggregationsITSuite
with StreamTest {

/** Test table and index name */
private val testTable = "default.flint_ppl_test"
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -86,7 +84,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val aggregateExpressions =
Seq(Alias(UnresolvedFunction(Seq("AVG"), Seq(ageField), isDistinct = false), "avg(age)")())
val aggregatePlan = Aggregate(Seq(), aggregateExpressions, table)
Expand Down Expand Up @@ -116,7 +114,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val filterExpr = LessThan(ageField, Literal(50))
val filterPlan = Filter(filterExpr, table)
val aggregateExpressions =
Expand Down Expand Up @@ -148,7 +146,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -177,7 +175,7 @@ class FlintSparkPPLAggregationsITSuite
// Define the expected logical plan
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -213,7 +211,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -248,7 +246,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -283,7 +281,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -317,7 +315,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -358,7 +356,7 @@ class FlintSparkPPLAggregationsITSuite
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down Expand Up @@ -396,7 +394,7 @@ class FlintSparkPPLAggregationsITSuite
val stateField = UnresolvedAttribute("state")
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("default", "flint_ppl_test"))
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Expand Down
Loading

0 comments on commit 6d87b81

Please sign in to comment.