Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSER
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.joins.BaseJoinExec
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{LongAccumulator, Utils}
Expand Down Expand Up @@ -481,9 +482,45 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
}

private def getOperationMetrics(query: SparkPlan): util.Map[String, lang.Long] = {
collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
n.metrics.map { case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value) }
}.getOrElse(Map.empty[String, lang.Long]).asJava
collectFirst(query) { case m: MergeRowsExec => m } match {
case Some(mergeRowsExec) =>
val mergeMetrics = mergeRowsExec.metrics.map {
case (name, metric) => s"merge.$name" -> lang.Long.valueOf(metric.value)
}
val numSourceRows = getNumSourceRows(mergeRowsExec)
(mergeMetrics + ("merge.numSourceRows" -> lang.Long.valueOf(numSourceRows))).asJava
case None =>
Map.empty[String, lang.Long].asJava
}
}

private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
def isTargetTableScan(plan: SparkPlan): Boolean = {
collectFirst(plan) {
case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true
}.getOrElse(false)
}

val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }

joinOpt.flatMap { join =>
val leftIsTarget = isTargetTableScan(join.left)
val rightIsTarget = isTargetTableScan(join.right)

val sourceChild = if (leftIsTarget) {
Some(join.right)
} else if (rightIsTarget) {
Some(join.left)
} else {
None
}

sourceChild.flatMap { child =>
Copy link
Member

@szehon-ho szehon-ho Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why do we need to traverse again here? I thought join.left and join.right is already the child and we can directly check that node? We dont want to traverse as each node without numOutputRows risks a wrong information (because that node may change the numOutputRows from its child)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed to findSourceSide, as we still need a step to find the source node with numOutputRows.

For example, with:

+- *(2) BroadcastHashJoin ...
                     :- *(2) Project ... 
                     :  +- BatchScan ... 
                     +- BroadcastQueryStage ...
                        +- BroadcastExchange ... 
                           +- *(1) Project ...
                              +- *(1) LocalTableScan ...

we find BroadcastQueryStage has the source table (after checking isTargetTableScan), but still need a step to traverse for LocalTableScan. As it is collectFirst, I think we don't worry about traversing too far

collectFirst(child) {
case plan if plan.metrics.contains("numOutputRows") => plan
}.flatMap(_.metrics.get("numOutputRows").map(_.value))
}
}.getOrElse(-1L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "3")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "2"))
assert(commitProps("merge.numTargetRowsInserted") === "0")
assert(commitProps("merge.numTargetRowsUpdated") === "1")
Expand Down Expand Up @@ -1870,6 +1871,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "3")
assert(commitProps("merge.numTargetRowsCopied") === "0")
assert(commitProps("merge.numTargetRowsInserted") === "1")
assert(commitProps("merge.numTargetRowsUpdated") === "0")
Expand Down Expand Up @@ -1925,6 +1927,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "3")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
assert(commitProps("merge.numTargetRowsInserted") === "0")
assert(commitProps("merge.numTargetRowsUpdated") === "2")
Expand Down Expand Up @@ -1982,6 +1985,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "3")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
assert(commitProps("merge.numTargetRowsInserted") === "0")
assert(commitProps("merge.numTargetRowsUpdated") === "0")
Expand Down Expand Up @@ -2040,6 +2044,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "4")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
assert(commitProps("merge.numTargetRowsInserted") === "1")
assert(commitProps("merge.numTargetRowsUpdated") === "2")
Expand Down Expand Up @@ -2098,6 +2103,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "4")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
assert(commitProps("merge.numTargetRowsInserted") === "1")
assert(commitProps("merge.numTargetRowsUpdated") === "0")
Expand Down Expand Up @@ -2139,6 +2145,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "4")
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "3"))
assert(commitProps("merge.numTargetRowsInserted") === "1")
assert(commitProps("merge.numTargetRowsUpdated") === "0")
Expand All @@ -2154,6 +2161,41 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
}
}

test("Merge metrics with numSourceRows for empty source") {
withTempView("source") {
createAndInitTable(
"pk INT NOT NULL, salary INT, dep STRING",
"""{ "pk": 1, "salary": 100, "dep": "hr" }
|{ "pk": 2, "salary": 200, "dep": "software" }
|{ "pk": 3, "salary": 300, "dep": "hr" }
|""".stripMargin)

// source is empty
Seq.empty[Int].toDF("pk").createOrReplaceTempView("source")

sql(s"""MERGE INTO $tableNameAsString t
|USING source s
|ON t.pk = s.pk
|WHEN MATCHED THEN
| UPDATE SET salary = 1000
|WHEN NOT MATCHED BY SOURCE THEN
| DELETE
|""".stripMargin)

val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "-1") // if no numOutputRows, should be -1
assert(commitProps("merge.numTargetRowsCopied") === (if (deltaMerge) "0" else "0"))
assert(commitProps("merge.numTargetRowsInserted") === "0")
assert(commitProps("merge.numTargetRowsUpdated") === "0")
assert(commitProps("merge.numTargetRowsDeleted") === "3")
assert(commitProps("merge.numTargetRowsMatchedUpdated") === "0")
assert(commitProps("merge.numTargetRowsMatchedDeleted") === "0")
assert(commitProps("merge.numTargetRowsNotMatchedBySourceUpdated") === "0")
assert(commitProps("merge.numTargetRowsNotMatchedBySourceDeleted") === "3")
}
}

test("Merge schema evolution new column with set explicit column") {
Seq((true, true), (false, true), (true, false)).foreach {
case (withSchemaEvolution, schemaEvolutionEnabled) =>
Expand Down