diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/DisallowCatalogViews.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/DisallowCatalogViews.scala new file mode 100644 index 000000000..8de43e0c5 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/DisallowCatalogViews.scala @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * This analysis rule validates that the submitted query is not referencing a Glue Catalog view. + * The rule simply traverses the plan and validates that none of the nodes resolved to a [[View]]. + */ +class DisallowCatalogViews extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.foreachUp { + case _: View => + throw new IllegalArgumentException(s"Catalog View is not allowed to be queried") + + case other => other + } + plan + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkExtensions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkExtensions.scala index b33e7cc45..f7a997e5d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkExtensions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkExtensions.scala @@ -21,9 +21,9 @@ class FlintSparkExtensions extends (SparkSessionExtensions => Unit) { } extensions.injectFunction(TumbleFunction.description) - extensions.injectOptimizerRule { spark => new FlintSparkOptimizer(spark) } + extensions.injectPostHocResolutionRule(_ => new DisallowCatalogViews()) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 2c5518778..5aed6514f 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -55,6 +55,9 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { | ('F', 35), ('G', 40), ('H', 45), ('I', 50), ('J', 55) | """.stripMargin) + // Create a view + sql(s"CREATE VIEW myView AS SELECT name, age FROM $testTable WHERE age > 20") + // Mock static create method in FlintClientBuilder used by Flint data source clientBuilder .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) @@ -141,6 +144,12 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { filterCondition = Some("name = 'A' OR age > 30"))) .assertIndexNotUsed(testTable) } + test("test querying catalog views throws validation exception") { + the[IllegalArgumentException] thrownBy { + sql(s"SELECT * FROM myview WHERE name = 'A' AND age > 30") + } should have message "Catalog View is not allowed to be queried" + + } test("should not apply if covering index is logically deleted") { assertFlintQueryRewriter diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 3f843dbe4..5503a9f76 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -101,7 +101,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit } protected def createPartitionedGrokEmailTable(testTable: String): Unit = { - spark.sql(s""" + val table = spark.sql(s""" | CREATE TABLE $testTable | ( | name STRING, @@ -115,7 +115,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | month INT | ) |""".stripMargin) - val data = Seq( ("Alice", 30, "alice@example.com", "123 Main St, Seattle", 2023, 4), ("Bob", 55, "bob@test.org", "456 Elm St, Portland", 2023, 5), @@ -135,6 +134,17 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | VALUES ('$name', $age, '$email', '$street_address') | """.stripMargin) } + // Create a Glue view + val glueViewName = "my_view" + val glueViewQuery = s""" + CREATE VIEW $glueViewName + AS + SELECT name, age, email, street_address + FROM $testTable + """ + + val res = spark.sql(glueViewQuery) + log.error(res.toString()) } protected def createPartitionedAddressTable(testTable: String): Unit = { sql(s""" diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala index 3e6e9bd29..6da891c32 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala @@ -5,6 +5,9 @@ package org.opensearch.flint.spark.ppl +import org.scalatest.matchers.must.Matchers.have +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} + import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, SortOrder} @@ -36,6 +39,14 @@ class FlintSparkPPLGrokITSuite } } + test("test querying catalog views throws validation exception") { + the[IllegalArgumentException] thrownBy { + sql(""" + | source = my_view| grok email '.+@%{HOSTNAME:host}' | fields email, host + | """.stripMargin) + } should have message "Catalog View is not allowed to be queried" + } + test("test grok email expressions parsing") { val frame = sql(s""" | source = $testTable| grok email '.+@%{HOSTNAME:host}' | fields email, host