Skip to content

Conversation

@asl3
Copy link
Contributor

@asl3 asl3 commented Oct 20, 2025

What changes were proposed in this pull request?

Add numSourceRows metric for MergeIntoExec, from source node's numOutputRows.

Assumption is that all child nodes have numOutputRows. If not found, numSourceRows would be -1.

Why are the changes needed?

Improve completeness and debuggability of Merge Into metrics.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test cases for numSourceNodes metric.

Was this patch authored or co-authored using generative AI tooling?

No.

@asl3 asl3 changed the title [SPARK-52578][SQL] Add numSourceRows metric for MergeIntoExec [SPARK-52578][SQL] Add numSourceRows metric for MergeIntoExec Oct 20, 2025
Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @asl3 i left some initial style comments

private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
def isTargetTableScan(plan: SparkPlan): Boolean = {
collectFirst(plan) {
case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pattern match using ( case BatchScanExec(t: RowLevelOperationTable))

def isTargetTableScan(plan: SparkPlan): Boolean = {
collectFirst(plan) {
case scan: BatchScanExec if scan.table.isInstanceOf[RowLevelOperationTable] => true
}.getOrElse(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDefined seems more clear. but its personal preference


val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }

joinOpt.flatMap { join =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: wdyt for comprehension here to avoid nested flatmap:

    for {
      join <- collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => j }
      sourceChild <- findSourceChild(join)
      plan <- collectFirst(sourceChild) {
        case plan if plan.metrics.contains("numOutputRows") => plan
      }
      metric <- plan.metrics.get("numOutputRows")
    } yield metric.value).getOrElse(-1L)


  private def findSourceChild(join: BaseJoinExec): Option[SparkPlan] = {
    val leftIsTarget = isTargetTableScan(join.left)
    val rightIsTarget = isTargetTableScan(join.right)

    if (leftIsTarget) {
      Some(join.right)
    } else if (rightIsTarget) {
      Some(join.left)
    } else {
      None
    }
  }

None
}

sourceChild.flatMap { child =>
Copy link
Member

@szehon-ho szehon-ho Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why do we need to traverse again here? I thought join.left and join.right is already the child and we can directly check that node? We dont want to traverse as each node without numOutputRows risks a wrong information (because that node may change the numOutputRows from its child)

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments on the tests

}
}

test("Merge metrics with numSourceRows for empty source") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one because Optimizer get rid of the join due to it not returning any result (no matching pks between source and target)? Let's tackle it in a follow up


val table = catalog.loadTable(ident)
val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties
assert(commitProps("merge.numSourceRows") === "4")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's run these tests in AQE/non-AQE mode to verify the walk works in both scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants