From 8bc3dff48c65378331fd294e9365f1bc977931ac Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 4 Sep 2025 21:49:41 +0800 Subject: [PATCH 1/3] feat: Improve some confusing fallback reasons --- .../apache/comet/rules/CometScanRule.scala | 101 +++++++++--------- 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 491502015a..62fbae6928 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -64,59 +64,64 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } private def _apply(plan: SparkPlan): SparkPlan = { - if (!isCometLoaded(conf) || !isCometScanEnabled(conf)) { - if (!isCometLoaded(conf)) { - withInfo(plan, "Comet is not enabled") - } else if (!isCometScanEnabled(conf)) { - withInfo(plan, "Comet Scan is not enabled") - } - plan - } else { + if (!isCometLoaded(conf)) return plan - def hasMetadataCol(plan: SparkPlan): Boolean = { - plan.expressions.exists(_.exists { - case a: Attribute => - a.isMetadataCol - case _ => false - }) - } + def isSupportedScanNode(plan: SparkPlan): Boolean = plan match { + case _: FileSourceScanExec => true + case _: BatchScanExec => true + case _ => false + } - def isIcebergMetadataTable(scanExec: BatchScanExec): Boolean = { - // List of Iceberg metadata tables: - // https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables - val metadataTableSuffix = Set( - "history", - "metadata_log_entries", - "snapshots", - "entries", - "files", - "manifests", - "partitions", - "position_deletes", - "all_data_files", - "all_delete_files", - "all_entries", - "all_manifests") - - metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) - } + def hasMetadataCol(plan: SparkPlan): Boolean = { + plan.expressions.exists(_.exists { + case a: Attribute => + a.isMetadataCol + case _ => false + }) + } - plan.transform { - case scan if hasMetadataCol(scan) => - withInfo(scan, "Metadata column is not supported") + def isIcebergMetadataTable(scanExec: BatchScanExec): Boolean = { + // List of Iceberg metadata tables: + // https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables + val metadataTableSuffix = Set( + "history", + "metadata_log_entries", + "snapshots", + "entries", + "files", + "manifests", + "partitions", + "position_deletes", + "all_data_files", + "all_delete_files", + "all_entries", + "all_manifests") + + metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) + } - // data source V1 - case scanExec: FileSourceScanExec => - transformV1Scan(scanExec) + def transformScan(plan: SparkPlan): SparkPlan = plan match { + case scan if !isCometScanEnabled(conf) => + withInfo(scan, "Comet Scan is not enabled") - // data source V2 - case scanExec: BatchScanExec => - if (isIcebergMetadataTable(scanExec)) { - withInfo(scanExec, "Iceberg Metadata tables are not supported") - } else { - transformV2Scan(scanExec) - } - } + case scan if hasMetadataCol(scan) => + withInfo(scan, "Metadata column is not supported") + + // data source V1 + case scanExec: FileSourceScanExec => + transformV1Scan(scanExec) + + // data source V2 + case scanExec: BatchScanExec => + if (isIcebergMetadataTable(scanExec)) { + withInfo(scanExec, "Iceberg Metadata tables are not supported") + } else { + transformV2Scan(scanExec) + } + } + + plan.transform { + case scan if isSupportedScanNode(scan) => transformScan(scan) } } From 93d233bdbfd9538b61743bb80b9be710d9bb6f1e Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 4 Sep 2025 21:53:59 +0800 Subject: [PATCH 2/3] fix comet shuffle manager class name --- spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 3500cce056..bee408d147 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -741,7 +741,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { s"Comet shuffle is not enabled: ${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled") false } else if (!isCometShuffleManagerEnabled(op.conf)) { - withInfo(op, s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}") + withInfo(op, s"spark.shuffle.manager is not set to ${classOf[CometShuffleManager].getName}") false } else { true From 7e5c900c9db0682852a40429e98e4ec21dbdced8 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Fri, 5 Sep 2025 13:21:19 +0800 Subject: [PATCH 3/3] avoid adding fallback reasons repeatedly --- .../org/apache/comet/CometSparkSessionExtensions.scala | 7 +++++++ .../scala/org/apache/comet/rules/CometExecRule.scala | 9 +++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index 5348f2936c..88535577b6 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -396,6 +396,13 @@ object CometSparkSessionExtensions extends Logging { withInfos(node, Set.empty, exprs: _*) } + /** + * Checks whether a TreeNode has any explain information attached + */ + def hasExplainInfo(node: TreeNode[_]): Boolean = { + node.getTagValue(CometExplainInfo.EXTENSION_INFO).exists(_.nonEmpty) + } + // Helper to reduce boilerplate def createMessage(condition: Boolean, message: => String): Option[String] = { if (condition) { diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index bee408d147..7b9a6e8049 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -540,8 +540,13 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { // these cases specially here so we do not add a misleading 'info' message op case _ => - // An operator that is not supported by Comet - withInfo(op, s"${op.nodeName} is not supported") + if (!hasExplainInfo(op)) { + // An operator that is not supported by Comet + withInfo(op, s"${op.nodeName} is not supported") + } else { + // Already has fallback reason, do not override it + op + } } } }