From b178e543f26d032f79c71cccaab74e94564a4b32 Mon Sep 17 00:00:00 2001 From: hezhao2 Date: Wed, 13 Dec 2023 14:19:25 +0800 Subject: [PATCH] enable MaxScanStrategy when accessing iceberg datasource --- .../spark/kyuubi-extension-spark-3-4/pom.xml | 6 ++ .../spark/source/IcebergSparkPlanHelper.scala | 33 ++++++++++ .../kyuubi/sql/watchdog/MaxScanStrategy.scala | 63 +++++++++++++++++-- 3 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml index ee5b5f1558a..4665b88644f 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/pom.xml +++ b/extensions/spark/kyuubi-extension-spark-3-4/pom.xml @@ -133,6 +133,12 @@ log4j-slf4j-impl test + + + org.apache.iceberg + iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version} + ${iceberg.version} + diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala new file mode 100644 index 00000000000..fd8509b8120 --- /dev/null +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/iceberg/spark/source/IcebergSparkPlanHelper.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.spark.source + +import org.apache.spark.sql.connector.read.Scan + +object IcebergSparkPlanHelper { + + type SparkBatchQueryScan = org.apache.iceberg.spark.source.SparkBatchQueryScan + + def numPartitions(scan: Scan): Long = { + if (scan.isInstanceOf[SparkBatchQueryScan]) { + val sparkBatchQueryScan = scan.asInstanceOf[SparkBatchQueryScan] + sparkBatchQueryScan.outputPartitioning().numPartitions() + } else { + 0L + } + } +} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala index 1ed55ebc2fd..7d7e25dd15b 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.sql.watchdog import org.apache.hadoop.fs.Path +import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.{numPartitions} import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation} @@ -26,8 +27,12 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType - import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + /** * Add MaxScanStrategy to avoid scan excessive partitions or files @@ -118,10 +123,15 @@ case class MaxScanStrategy(session: SparkSession) } else { lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum if (maxFileSizeOpt.exists(_ < scanFileSize)) { - throw nonPartTableMaxFileExceedError( - scanFileSize, - maxFileSizeOpt.get, - Some(relation.tableMeta)) + throw new MaxFileSizeExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${relation.tableMeta.qualifiedName} + |Owner: ${relation.tableMeta.owner} + |Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")} + |""".stripMargin) } } case ScanOperation( @@ -232,6 +242,49 @@ case class MaxScanStrategy(session: SparkSession) logicalRelation.catalogTable) } } + case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation) + if isIcebergRelation(relation.relation) => + val icebergTable = relation.relation. + table.asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + if (icebergTable.partitioning().nonEmpty) { + val partitionColumnNames = icebergTable.table().spec().fields().map(_.name()).seq + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + lazy val scanPartitions = numPartitions(relation.scan) + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + throw new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${icebergTable.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + if (maxScanPartitionsOpt.exists(_ < scanPartitions)) { + throw new MaxPartitionExceedException( + s""" + |Your SQL job scan a whole huge table without any partition filter, + |You should optimize your SQL logical according partition structure + |or shorten query scope such as p_date, detail as below: + |Table: ${icebergTable.name()} + |Partition Structure: ${partitionColumnNames.mkString(",")} + |""".stripMargin) + } + } else { + val stats = relation.computeStats() + lazy val scanFileSize = stats.sizeInBytes + if (maxFileSizeOpt.exists(_ < scanFileSize)) { + new MaxFileSizeExceedException( + s""" + |SQL job scan file size in bytes: $scanFileSize + |exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get} + |detail as below: + |Table: ${icebergTable.name()} + |""".stripMargin) + } + } case _ => } }