Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow queries that reference catalog views #641

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* This analysis rule validates that the submitted query is not referencing a Glue Catalog view.
* The rule simply traverses the plan and validates that none of the nodes resolved to a [[View]].
*/
class DisallowCatalogViews extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.foreachUp {
case _: View =>
throw new IllegalArgumentException(s"Catalog View is not allowed to be queried")

case other => other
}
plan
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class FlintSparkExtensions extends (SparkSessionExtensions => Unit) {
}

extensions.injectFunction(TumbleFunction.description)

extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
extensions.injectPostHocResolutionRule(_ => new DisallowCatalogViews())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
| ('F', 35), ('G', 40), ('H', 45), ('I', 50), ('J', 55)
| """.stripMargin)

// Create a view
sql(s"CREATE VIEW myView AS SELECT name, age FROM $testTable WHERE age > 20")

// Mock static create method in FlintClientBuilder used by Flint data source
clientBuilder
.when(() => FlintClientBuilder.build(any(classOf[FlintOptions])))
Expand Down Expand Up @@ -141,6 +144,12 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
filterCondition = Some("name = 'A' OR age > 30")))
.assertIndexNotUsed(testTable)
}
test("test querying catalog views throws validation exception") {
the[IllegalArgumentException] thrownBy {
sql(s"SELECT * FROM myview WHERE name = 'A' AND age > 30")
} should have message "Catalog View is not allowed to be queried"

}

test("should not apply if covering index is logically deleted") {
assertFlintQueryRewriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
}

protected def createPartitionedGrokEmailTable(testTable: String): Unit = {
spark.sql(s"""
val table = spark.sql(s"""
| CREATE TABLE $testTable
| (
| name STRING,
Expand All @@ -115,7 +115,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| month INT
| )
|""".stripMargin)

val data = Seq(
("Alice", 30, "[email protected]", "123 Main St, Seattle", 2023, 4),
("Bob", 55, "[email protected]", "456 Elm St, Portland", 2023, 5),
Expand All @@ -135,6 +134,17 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| VALUES ('$name', $age, '$email', '$street_address')
| """.stripMargin)
}
// Create a Glue view
val glueViewName = "my_view"
val glueViewQuery = s"""
CREATE VIEW $glueViewName
AS
SELECT name, age, email, street_address
FROM $testTable
"""

val res = spark.sql(glueViewQuery)
log.error(res.toString())
}
protected def createPartitionedAddressTable(testTable: String): Unit = {
sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark.ppl

import org.scalatest.matchers.must.Matchers.have
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, SortOrder}
Expand Down Expand Up @@ -36,6 +39,14 @@ class FlintSparkPPLGrokITSuite
}
}

test("test querying catalog views throws validation exception") {
the[IllegalArgumentException] thrownBy {
sql("""
| source = my_view| grok email '.+@%{HOSTNAME:host}' | fields email, host
| """.stripMargin)
} should have message "Catalog View is not allowed to be queried"
}

test("test grok email expressions parsing") {
val frame = sql(s"""
| source = $testTable| grok email '.+@%{HOSTNAME:host}' | fields email, host
Expand Down
Loading