Skip to content

Commit

Permalink
enable MaxScanStrategy when accessing iceberg datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaohehuhu committed Dec 13, 2023
1 parent 8ab4763 commit b178e54
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 5 deletions.
6 changes: 6 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${spark.binary.version}_${scala.binary.version}</artifactId>
<version>${iceberg.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iceberg.spark.source

import org.apache.spark.sql.connector.read.Scan

object IcebergSparkPlanHelper {

type SparkBatchQueryScan = org.apache.iceberg.spark.source.SparkBatchQueryScan

def numPartitions(scan: Scan): Long = {
if (scan.isInstanceOf[SparkBatchQueryScan]) {
val sparkBatchQueryScan = scan.asInstanceOf[SparkBatchQueryScan]
sparkBatchQueryScan.outputPartitioning().numPartitions()
} else {
0L
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.sql.watchdog

import org.apache.hadoop.fs.Path
import org.apache.iceberg.spark.source.IcebergSparkPlanHelper.{numPartitions}
import org.apache.spark.sql.{PruneFileSourcePartitionHelper, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
Expand All @@ -26,8 +27,12 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.sql.KyuubiSQLConf
import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`


/**
* Add MaxScanStrategy to avoid scan excessive partitions or files
Expand Down Expand Up @@ -118,10 +123,15 @@ case class MaxScanStrategy(session: SparkSession)
} else {
lazy val scanFileSize = relation.tableMeta.stats.map(_.sizeInBytes).sum
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw nonPartTableMaxFileExceedError(
scanFileSize,
maxFileSizeOpt.get,
Some(relation.tableMeta))
throw new MaxFileSizeExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${relation.tableMeta.qualifiedName}
|Owner: ${relation.tableMeta.owner}
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(", ")}
|""".stripMargin)
}
}
case ScanOperation(
Expand Down Expand Up @@ -232,6 +242,49 @@ case class MaxScanStrategy(session: SparkSession)
logicalRelation.catalogTable)
}
}
case ScanOperation(_, _, _, relation: DataSourceV2ScanRelation)
if isIcebergRelation(relation.relation) =>
val icebergTable = relation.relation.
table.asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
if (icebergTable.partitioning().nonEmpty) {
val partitionColumnNames = icebergTable.table().spec().fields().map(_.name()).seq
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
lazy val scanPartitions = numPartitions(relation.scan)
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
throw new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${icebergTable.name()}
|Partition Structure: ${partitionColumnNames.mkString(",")}
|""".stripMargin)
}
if (maxScanPartitionsOpt.exists(_ < scanPartitions)) {
throw new MaxPartitionExceedException(
s"""
|Your SQL job scan a whole huge table without any partition filter,
|You should optimize your SQL logical according partition structure
|or shorten query scope such as p_date, detail as below:
|Table: ${icebergTable.name()}
|Partition Structure: ${partitionColumnNames.mkString(",")}
|""".stripMargin)
}
} else {
val stats = relation.computeStats()
lazy val scanFileSize = stats.sizeInBytes
if (maxFileSizeOpt.exists(_ < scanFileSize)) {
new MaxFileSizeExceedException(
s"""
|SQL job scan file size in bytes: $scanFileSize
|exceed restrict of table scan maxFileSize ${maxFileSizeOpt.get}
|detail as below:
|Table: ${icebergTable.name()}
|""".stripMargin)
}
}
case _ =>
}
}
Expand Down

0 comments on commit b178e54

Please sign in to comment.