diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala index 832cc084bb4..ebd9e7a3e4f 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.delta.{GlutenOptimisticTransaction, OptimisticTransaction, TransactionExecutionObserver} -import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableCommand} import org.apache.spark.sql.execution.metric.SQLMetric case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends LeafV2CommandExec { @@ -59,6 +59,23 @@ case class GlutenDeltaLeafRunnableCommand(delegate: LeafRunnableCommand) override def nodeName: String = "GlutenDelta " + delegate.nodeName } +case class GlutenDeltaRunnableCommand(delegate: RunnableCommand) extends LeafRunnableCommand { + override lazy val metrics: Map[String, SQLMetric] = delegate.metrics + + override def output: Seq[Attribute] = { + delegate.output + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + TransactionExecutionObserver.withObserver( + DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) { + delegate.run(sparkSession) + } + } + + override def nodeName: String = "GlutenDelta " + delegate.nodeName +} + object DeltaV2WriteOperators { object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver { override def startingTransaction(f: => OptimisticTransaction): OptimisticTransaction = { diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala index 4f3e96585cc..f4b74e4dbec 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala @@ -20,13 +20,14 @@ import org.apache.gluten.config.VeloxDeltaConfig import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.delta.catalog.DeltaCatalog -import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand} +import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaCommand, OptimizeTableCommand, UpdateCommand} +import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta.sources.DeltaDataSource import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand -case class OffloadDeltaCommand() extends OffloadSingleNode { +case class OffloadDeltaCommand() extends OffloadSingleNode with DeltaCommand { override def offload(plan: SparkPlan): SparkPlan = { if (!VeloxDeltaConfig.get.enableNativeWrite) { return plan @@ -36,6 +37,8 @@ case class OffloadDeltaCommand() extends OffloadSingleNode { ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(uc)) case ExecutedCommandExec(dc: DeleteCommand) => ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc)) + case ExecutedCommandExec(optimize: OptimizeTableCommand) if shouldOffloadOptimize(optimize) => + ExecutedCommandExec(GlutenDeltaRunnableCommand(optimize)) case ExecutedCommandExec(s @ SaveIntoDataSourceCommand(_, _: DeltaDataSource, _, _)) => ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(s)) case ctas: AtomicCreateTableAsSelectExec if ctas.catalog.isInstanceOf[DeltaCatalog] => @@ -45,4 +48,19 @@ case class OffloadDeltaCommand() extends OffloadSingleNode { case other => other } } + + // Currently only plain OPTIMIZE bin-packing is supported for command offload. OPTIMIZE + // variants with layout-specific semantics, such as ZORDER, REORG, OPTIMIZE FULL, or + // liquid clustering, continue to use Delta's original command path. + private def shouldOffloadOptimize(optimize: OptimizeTableCommand): Boolean = { + optimize.zOrderBy.isEmpty && + optimize.optimizeContext.reorg.isEmpty && + !optimize.optimizeContext.isFull && + !isClusteredOptimize(optimize) + } + + private def isClusteredOptimize(optimize: OptimizeTableCommand): Boolean = { + val snapshot = getDeltaTable(optimize.child, "OPTIMIZE").update() + ClusteredTableUtils.isSupported(snapshot.protocol) + } } diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala new file mode 100644 index 00000000000..26205711869 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.VeloxDeltaConfig + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand} +import org.apache.spark.sql.internal.SQLConf + +class DeltaNativeWriteSuite extends DeltaSQLCommandTest { + + import testImplicits._ + + private lazy val isMac = sys.props + .get("os.name") + .exists(_.toLowerCase(java.util.Locale.ROOT).contains("mac")) + + private def withNativeWriteOffloadConf(f: => Unit): Unit = { + val confs = Seq( + SQLConf.ANSI_ENABLED.key -> "false", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + GlutenConfig.GLUTEN_ANSI_FALLBACK_ENABLED.key -> "false", + DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false" + ) ++ + (if (isMac) { + Seq(GlutenConfig.NATIVE_VALIDATION_ENABLED.key -> "false") + } else { + Seq.empty + }) + + withSQLConf(confs: _*) { + f + } + } + + private def hasGlutenDeltaWriteCommand(plan: SparkPlan): Boolean = { + val nativeClassMatch = plan + .collectFirst { + case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true + case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true + case _: GlutenDeltaLeafV2CommandExec => true + } + .getOrElse(false) + + val nativeNodeMatch = plan + .collectFirst { + case p if p.nodeName.startsWith("Execute GlutenDelta ") => true + case p if p.nodeName.startsWith("GlutenDelta ") => true + } + .getOrElse(false) + + val nativeTreeMatch = plan.treeString.contains("GlutenDelta ") + + nativeClassMatch || nativeNodeMatch || nativeTreeMatch + } + + private def assertContainsNativeWriteCommand(plan: SparkPlan, context: String): Unit = { + assert( + hasGlutenDeltaWriteCommand(plan), + s"Expected native delta write command for $context, but got plan:\n${plan.treeString}" + ) + } + + private def assertNoNativeWriteCommand(plan: SparkPlan, context: String): Unit = { + assert( + !hasGlutenDeltaWriteCommand(plan), + s"Expected no native delta write command for $context, but got plan:\n${plan.treeString}" + ) + } + + private def files(deltaLog: DeltaLog): Set[AddFile] = { + deltaLog.update().allFiles.collect().toSet + } + + private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = { + val metrics = df.select("metrics.*").as[OptimizeMetrics].collect() + assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got ${metrics.length}") + metrics.head + } + + private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit = { + val latestCommit = deltaLog.history.getHistory(Some(1)).head + assert( + latestCommit.operation == "OPTIMIZE", + s"Expected latest Delta operation for $context to be OPTIMIZE, got " + + latestCommit.operation) + } + + private def assertCompactionMetrics( + metrics: OptimizeMetrics, + beforeFileCount: Int, + afterFileCount: Int, + context: String, + expectedPartitionsOptimized: Option[Long] = None): Unit = { + assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context") + assert(metrics.numFilesAdded > 0, s"Expected files added for $context") + assert( + afterFileCount < beforeFileCount, + s"Expected fewer active files after $context, before=$beforeFileCount after=$afterFileCount") + assert( + metrics.numFilesRemoved > metrics.numFilesAdded, + s"Expected $context to compact to fewer files, removed=${metrics.numFilesRemoved} " + + s"added=${metrics.numFilesAdded}" + ) + assert( + metrics.filesRemoved.totalFiles == metrics.numFilesRemoved, + s"Removed file metrics did not match numFilesRemoved for $context") + assert( + metrics.filesAdded.totalFiles == metrics.numFilesAdded, + s"Added file metrics did not match numFilesAdded for $context") + assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size metrics for $context") + assert(metrics.filesAdded.totalSize > 0, s"Expected added file size metrics for $context") + assert(metrics.numBatches > 0, s"Expected at least one optimize batch for $context") + expectedPartitionsOptimized.foreach { + expected => + assert( + metrics.partitionsOptimized == expected, + s"Expected $expected optimized partitions for $context, got " + + metrics.partitionsOptimized) + } + } + + test("native delta optimize command should be offloaded") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark.range(0, 32, 1, 4).toDF("id").write.format("delta").mode("append").save(path) + spark.range(32, 64, 1, 4).toDF("id").write.format("delta").mode("append").save(path) + + val deltaLog = DeltaLog.forTable(spark, path) + val beforeFiles = files(deltaLog) + + val optimizeDf = sql(s"OPTIMIZE delta.`$path`") + assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan, "OPTIMIZE") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "path OPTIMIZE") + assertOptimizeCommit(deltaLog, "path OPTIMIZE") + val result = spark.read.format("delta").load(path) + assert(result.collect().map(_.getLong(0)).toSet == (0L until 64L).toSet) + } + } + } + + test("native delta optimize table command should be offloaded") { + withNativeWriteOffloadConf { + withTable("delta_native_optimize_table") { + spark + .range(0, 32, 1, 4) + .toDF("id") + .write + .format("delta") + .mode("overwrite") + .saveAsTable("delta_native_optimize_table") + spark + .range(32, 64, 1, 4) + .toDF("id") + .write + .format("delta") + .mode("append") + .saveAsTable("delta_native_optimize_table") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("delta_native_optimize_table")) + val beforeFiles = files(deltaLog) + + val optimizeDf = sql("OPTIMIZE delta_native_optimize_table") + assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan, "OPTIMIZE table") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "table OPTIMIZE") + assertOptimizeCommit(deltaLog, "table OPTIMIZE") + val result = spark.read.table("delta_native_optimize_table") + assert(result.collect().map(_.getLong(0)).toSet == (0L until 64L).toSet) + } + } + } + + test("native delta optimize partition predicate command should be offloaded") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark + .range(0, 20, 1, 4) + .selectExpr("id", "cast(id % 2 as int) as part") + .write + .format("delta") + .partitionBy("part") + .mode("append") + .save(path) + spark + .range(20, 40, 1, 4) + .selectExpr("id", "cast(id % 2 as int) as part") + .write + .format("delta") + .partitionBy("part") + .mode("append") + .save(path) + + val deltaLog = DeltaLog.forTable(spark, path) + val beforeFiles = files(deltaLog) + val beforePart0Paths = beforeFiles + .filter(_.partitionValues.get("part").contains("0")) + .map(_.path) + val beforePart1Count = beforeFiles.count(_.partitionValues.get("part").contains("1")) + + val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1") + assertContainsNativeWriteCommand( + optimizeDf.queryExecution.executedPlan, + "OPTIMIZE WHERE") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + val afterPart0Paths = afterFiles + .filter(_.partitionValues.get("part").contains("0")) + .map(_.path) + val afterPart1Count = afterFiles.count(_.partitionValues.get("part").contains("1")) + assert( + beforePart0Paths.subsetOf(afterPart0Paths), + "OPTIMIZE WHERE part = 1 should not remove files from part = 0") + assert( + afterPart1Count < beforePart1Count, + s"Expected fewer active files in part = 1, before=$beforePart1Count " + + s"after=$afterPart1Count") + assertCompactionMetrics( + metrics, + beforeFiles.size, + afterFiles.size, + "partition predicate OPTIMIZE", + expectedPartitionsOptimized = Some(1L)) + assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE") + val result = spark.read.format("delta").load(path) + assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L until 40L).toSet) + assert(result.where("part = 0").count() == 20) + assert(result.where("part = 1").count() == 20) + } + } + } + + test("delta optimize command should not be offloaded when native write is disabled") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark.range(0, 10, 1, 2).toDF("id").write.format("delta").mode("append").save(path) + spark.range(10, 20, 1, 2).toDF("id").write.format("delta").mode("append").save(path) + + withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") { + val optimizeDf = sql(s"OPTIMIZE delta.`$path`") + assertNoNativeWriteCommand( + optimizeDf.queryExecution.executedPlan, + "OPTIMIZE with native write disabled") + optimizeDf.collect() + } + + val result = spark.read.format("delta").load(path) + assert(result.collect().map(_.getLong(0)).toSet == (0L until 20L).toSet) + } + } + } +} diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala index 0b5423ab227..bca0a66d1ad 100644 --- a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala +++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala @@ -19,17 +19,20 @@ package org.apache.spark.sql.delta import org.apache.gluten.config.GlutenConfig import org.apache.gluten.config.VeloxDeltaConfig -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, GlutenDeltaLeafV2CommandExec} +import org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.QueryExecutionListener -import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.{CopyOnWriteArrayList, TimeUnit} import scala.jdk.CollectionConverters._ @@ -86,6 +89,7 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest { val nativeClassMatch = plan .collectFirst { case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true + case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true case _: GlutenDeltaLeafV2CommandExec => true } .getOrElse(false) @@ -115,6 +119,21 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest { spark.listenerManager.register(listener) try { action + val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10) + var lastSize = -1 + var stableSince = System.nanoTime() + while (System.nanoTime() < deadline) { + val size = plans.size() + val now = System.nanoTime() + if (size != lastSize) { + lastSize = size + stableSince = now + } + if (size > 0 && now - stableSince >= TimeUnit.MILLISECONDS.toNanos(500)) { + return plans.asScala.toSeq + } + Thread.sleep(50) + } } finally { spark.listenerManager.unregister(listener) } @@ -137,6 +156,58 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest { ) } + private def files(deltaLog: DeltaLog): Set[AddFile] = { + deltaLog.update().allFiles.collect().toSet + } + + private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = { + val metrics = df.select("metrics.*").as[OptimizeMetrics].collect() + assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got ${metrics.length}") + metrics.head + } + + private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit = { + val latestCommit = deltaLog.history.getHistory(Some(1)).head + assert( + latestCommit.operation == "OPTIMIZE", + s"Expected latest Delta operation for $context to be OPTIMIZE, got " + + latestCommit.operation) + } + + private def assertCompactionMetrics( + metrics: OptimizeMetrics, + beforeFileCount: Int, + afterFileCount: Int, + context: String, + expectedPartitionsOptimized: Option[Long] = None): Unit = { + assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context") + assert(metrics.numFilesAdded > 0, s"Expected files added for $context") + assert( + afterFileCount < beforeFileCount, + s"Expected fewer active files after $context, before=$beforeFileCount after=$afterFileCount") + assert( + metrics.numFilesRemoved > metrics.numFilesAdded, + s"Expected $context to compact to fewer files, removed=${metrics.numFilesRemoved} " + + s"added=${metrics.numFilesAdded}" + ) + assert( + metrics.filesRemoved.totalFiles == metrics.numFilesRemoved, + s"Removed file metrics did not match numFilesRemoved for $context") + assert( + metrics.filesAdded.totalFiles == metrics.numFilesAdded, + s"Added file metrics did not match numFilesAdded for $context") + assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size metrics for $context") + assert(metrics.filesAdded.totalSize > 0, s"Expected added file size metrics for $context") + assert(metrics.numBatches > 0, s"Expected at least one optimize batch for $context") + expectedPartitionsOptimized.foreach { + expected => + assert( + metrics.partitionsOptimized == expected, + s"Expected $expected optimized partitions for $context, got " + + metrics.partitionsOptimized) + } + } + test("native delta delete command should be offloaded") { withNativeWriteOffloadConf { withTempDir { @@ -275,6 +346,150 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest { } } + test("native delta optimize command should be offloaded") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark.range(0, 32, 1, 4).toDF("id").write.format("delta").mode("append").save(path) + spark.range(32, 64, 1, 4).toDF("id").write.format("delta").mode("append").save(path) + + val deltaLog = DeltaLog.forTable(spark, path) + val beforeFiles = files(deltaLog) + + val optimizeDf = sql(s"OPTIMIZE delta.`$path`") + assertContainsNativeWriteCommand(Seq(optimizeDf.queryExecution.executedPlan), "OPTIMIZE") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "path OPTIMIZE") + assertOptimizeCommit(deltaLog, "path OPTIMIZE") + val result = spark.read.format("delta").load(path) + assert(result.collect().map(_.getLong(0)).toSet == (0L until 64L).toSet) + } + } + } + + test("native delta optimize table command should be offloaded") { + withNativeWriteOffloadConf { + withTable("delta_native_optimize_table") { + spark + .range(0, 32, 1, 4) + .toDF("id") + .write + .format("delta") + .mode("overwrite") + .saveAsTable("delta_native_optimize_table") + spark + .range(32, 64, 1, 4) + .toDF("id") + .write + .format("delta") + .mode("append") + .saveAsTable("delta_native_optimize_table") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("delta_native_optimize_table")) + val beforeFiles = files(deltaLog) + + val optimizeDf = sql("OPTIMIZE delta_native_optimize_table") + assertContainsNativeWriteCommand( + Seq(optimizeDf.queryExecution.executedPlan), + "OPTIMIZE table") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, "table OPTIMIZE") + assertOptimizeCommit(deltaLog, "table OPTIMIZE") + val result = spark.read.table("delta_native_optimize_table") + assert(result.collect().map(_.getLong(0)).toSet == (0L until 64L).toSet) + } + } + } + + test("native delta optimize partition predicate command should be offloaded") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark + .range(0, 20, 1, 4) + .selectExpr("id", "cast(id % 2 as int) as part") + .write + .format("delta") + .partitionBy("part") + .mode("append") + .save(path) + spark + .range(20, 40, 1, 4) + .selectExpr("id", "cast(id % 2 as int) as part") + .write + .format("delta") + .partitionBy("part") + .mode("append") + .save(path) + + val deltaLog = DeltaLog.forTable(spark, path) + val beforeFiles = files(deltaLog) + val beforePart0Paths = beforeFiles + .filter(_.partitionValues.get("part").contains("0")) + .map(_.path) + val beforePart1Count = beforeFiles.count(_.partitionValues.get("part").contains("1")) + + val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1") + assertContainsNativeWriteCommand( + Seq(optimizeDf.queryExecution.executedPlan), + "OPTIMIZE WHERE") + val metrics = collectOptimizeMetrics(optimizeDf) + + val afterFiles = files(deltaLog) + val afterPart0Paths = afterFiles + .filter(_.partitionValues.get("part").contains("0")) + .map(_.path) + val afterPart1Count = afterFiles.count(_.partitionValues.get("part").contains("1")) + assert( + beforePart0Paths.subsetOf(afterPart0Paths), + "OPTIMIZE WHERE part = 1 should not remove files from part = 0") + assert( + afterPart1Count < beforePart1Count, + s"Expected fewer active files in part = 1, before=$beforePart1Count " + + s"after=$afterPart1Count") + assertCompactionMetrics( + metrics, + beforeFiles.size, + afterFiles.size, + "partition predicate OPTIMIZE", + expectedPartitionsOptimized = Some(1L)) + assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE") + val result = spark.read.format("delta").load(path) + assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L until 40L).toSet) + assert(result.where("part = 0").count() == 20) + assert(result.where("part = 1").count() == 20) + } + } + } + + test("delta optimize command should not be offloaded when native write is disabled") { + withNativeWriteOffloadConf { + withTempDir { + dir => + val path = dir.getCanonicalPath + spark.range(0, 10, 1, 2).toDF("id").write.format("delta").mode("append").save(path) + spark.range(10, 20, 1, 2).toDF("id").write.format("delta").mode("append").save(path) + + withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") { + val optimizeDf = sql(s"OPTIMIZE delta.`$path`") + assertNoNativeWriteCommand( + Seq(optimizeDf.queryExecution.executedPlan), + "OPTIMIZE with native write disabled") + optimizeDf.collect() + } + + val result = spark.read.format("delta").load(path) + assert(result.collect().map(_.getLong(0)).toSet == (0L until 20L).toSet) + } + } + } + test("delta save command should not be offloaded when native write is disabled") { withNativeWriteOffloadConf { withTempDir { diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 616cb9ac7da..1050d085547 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -703,6 +703,9 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") enableSuite[GlutenSparkPlannerSuite] enableSuite[GlutenSparkScriptTransformationSuite] + // Flaky in CI containers for Spark 4.1: intermittently fails with + // `/tmp/test-resource*.py: Permission denied` and can crash JVM. + .exclude("SPARK-33934: Add SparkFile's root dir to env property PATH") enableSuite[GlutenSparkSqlParserSuite] enableSuite[GlutenUnsafeFixedWidthAggregationMapSuite] enableSuite[GlutenUnsafeKVExternalSorterSuite]