diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index b7a1e172b2c8..b6f839ebb156 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -1212,6 +1212,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { VeloxColumnarToCarrierRowExec.enforce(plan) } + override def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = { + // Skip offloading when stream is defined (structured streaming source) + plan.getStream.isEmpty + } + + override def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer = + VeloxLocalTableScanTransformer.replace(plan) + override def genTimestampAddTransformer( substraitExprName: String, left: ExpressionTransformer, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxLocalTableScanTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxLocalTableScanTransformer.scala new file mode 100644 index 000000000000..8d0c59d50b5f --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxLocalTableScanTransformer.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.velox.VeloxValidatorApi +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{LocalTableScanTransformer, SparkPlan} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of LocalTableScanTransformer. + * + * Converts a driver-side local collection (Seq[InternalRow]) into columnar batches using Velox's + * native row-to-columnar conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxLocalTableScanTransformer( + outputAttributes: Seq[Attribute], + rows: Seq[InternalRow], + // Row-to-columnar conversion preserves data distribution, so we carry through + // the original partitioning, consistent with RowToVeloxColumnarExec's behavior. + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends LocalTableScanTransformer(outputAttributes, outputPartitioning, outputOrdering) + with Logging { + + @transient override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") + ) + + override protected def doValidateInternal(): ValidationResult = { + for (field <- schema.fields) { + val reason = VeloxValidatorApi.validateSchema(field.dataType) + if (reason.isDefined) { + return ValidationResult.failed(reason.get) + } + } + ValidationResult.succeeded + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + val convertTime = longMetric("convertTime") + val localSchema = this.schema + val batchSize = GlutenConfig.get.maxBatchSize + val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes + + if (rows.isEmpty) { + sparkContext.emptyRDD[ColumnarBatch] + } else { + // Materialize rows as UnsafeRow on the driver, then parallelize + val proj = UnsafeProjection.create(outputAttributes, outputAttributes) + val unsafeRows = rows.map(r => proj(r).copy()).toArray + val numSlices = math.min(unsafeRows.length, sparkContext.defaultParallelism) + val rowRdd = sparkContext.parallelize(unsafeRows.toSeq, numSlices) + + rowRdd.mapPartitions { + iter => + RowToVeloxColumnarExec.toColumnarBatchIterator( + iter, + localSchema, + numInputRows, + numOutputBatches, + convertTime, + batchSize, + batchBytes) + } + } + } + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[SparkPlan]): SparkPlan = { + assert(newChildren.isEmpty, "VeloxLocalTableScanTransformer is a leaf node") + copy(outputAttributes, rows, outputPartitioning, outputOrdering) + } +} + +object VeloxLocalTableScanTransformer { + def replace(plan: org.apache.spark.sql.execution.LocalTableScanExec): LocalTableScanTransformer = + VeloxLocalTableScanTransformer( + plan.output, + plan.rows, + plan.outputPartitioning, + plan.outputOrdering) +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxLocalTableScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxLocalTableScanSuite.scala new file mode 100644 index 000000000000..514821da284d --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxLocalTableScanSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.types._ + +import java.util.{Arrays => JArrays} + +class VeloxLocalTableScanSuite + extends VeloxWholeStageTransformerSuite + with AdaptiveSparkPlanHelper { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.ansi.enabled", "false") + } + + private def assertHasVeloxLocalTableScan(df: DataFrame): Unit = { + val found = collect(df.queryExecution.executedPlan) { + case _: VeloxLocalTableScanTransformer => true + } + assert(found.nonEmpty, "Expected VeloxLocalTableScanTransformer in plan") + } + + private def createDF(rows: Seq[Row], schema: StructType): DataFrame = { + spark.createDataFrame(JArrays.asList(rows: _*), schema) + } + + test("basic LocalTableScanExec with int and string columns") { + val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + val rows = Seq(Row(1, "a"), Row(2, "b"), Row(3, "c")) + val df = createDF(rows, schema) + checkAnswer(df, rows) + assertHasVeloxLocalTableScan(df) + } + + test("LocalTableScan with numeric types") { + val schema = StructType( + Seq( + StructField("lng", LongType), + StructField("dbl", DoubleType), + StructField("flt", FloatType), + StructField("shrt", ShortType), + StructField("byt", ByteType))) + val rows = Seq(Row(1L, 1.5, 2.5f, 100.toShort, 42.toByte)) + val df = createDF(rows, schema) + checkAnswer(df, rows) + assertHasVeloxLocalTableScan(df) + } + + test("LocalTableScan with boolean and null types") { + val schema = StructType( + Seq(StructField("flag", BooleanType), StructField("value", IntegerType, nullable = true))) + val rows = Seq(Row(true, 1), Row(false, null)) + val df = createDF(rows, schema) + checkAnswer(df, rows) + assertHasVeloxLocalTableScan(df) + } + + test("LocalTableScan with empty collection") { + val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + val df = createDF(Seq.empty, schema) + checkAnswer(df, Seq.empty[Row]) + } + + test("LocalTableScan with aggregation downstream") { + val schema = StructType(Seq(StructField("key", StringType), StructField("value", IntegerType))) + val rows = Seq(Row("a", 10), Row("b", 20), Row("a", 30)) + val df = createDF(rows, schema) + val result = df.groupBy("key").sum("value") + checkAnswer(result, Seq(Row("a", 40), Row("b", 20))) + assertHasVeloxLocalTableScan(result) + } + + test("LocalTableScan with filter downstream") { + val schema = StructType(Seq(StructField("x", IntegerType))) + val rows = Seq(Row(1), Row(2), Row(3), Row(4), Row(5)) + val df = createDF(rows, schema).filter("x > 3") + checkAnswer(df, Seq(Row(4), Row(5))) + assertHasVeloxLocalTableScan(df) + } + + test("LocalTableScan with join") { + val leftSchema = + StructType(Seq(StructField("id", IntegerType), StructField("name", StringType))) + val rightSchema = + StructType(Seq(StructField("id", IntegerType), StructField("score", IntegerType))) + val left = createDF(Seq(Row(1, "a"), Row(2, "b")), leftSchema) + val right = createDF(Seq(Row(1, 100), Row(2, 200)), rightSchema) + val result = left.join(right, "id") + checkAnswer(result, Seq(Row(1, "a", 100), Row(2, "b", 200))) + assertHasVeloxLocalTableScan(result) + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index ce0a79f0bc21..827d371bcea0 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -777,6 +777,11 @@ trait SparkPlanExecApi { def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = throw new GlutenNotSupportException("RDDScanExec is not supported") + def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = false + + def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer = + throw new GlutenNotSupportException("LocalTableScanExec is not supported") + def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch = throw new GlutenNotSupportException("Copying ColumnarBatch is not supported") diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index 3d844607b391..5d116c2c75f2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -315,6 +315,9 @@ object OffloadOthers { child) case plan: RDDScanExec if RDDScanTransformer.isSupportRDDScanExec(plan) => RDDScanTransformer.getRDDScanTransform(plan) + case plan: LocalTableScanExec + if LocalTableScanTransformer.isSupportLocalTableScanExec(plan) => + LocalTableScanTransformer.getLocalTableScanTransform(plan) case p if !p.isInstanceOf[GlutenPlan] => logDebug(s"Transformation for ${p.getClass} is currently not supported.") p diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/LocalTableScanTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/LocalTableScanTransformer.scala new file mode 100644 index 000000000000..83ef3198635a --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/LocalTableScanTransformer.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.ValidatablePlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} + +abstract class LocalTableScanTransformer( + outputAttributes: Seq[Attribute], + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val outputOrdering: Seq[SortOrder] = Nil +) extends ValidatablePlan { + + override def rowType0(): Convention.RowType = Convention.RowType.None + override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType + override def output: Seq[Attribute] = outputAttributes + + override protected def doExecute() + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + + override def children: Seq[SparkPlan] = Seq.empty +} + +object LocalTableScanTransformer { + def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = + BackendsApiManager.getSparkPlanExecApiInstance.isSupportLocalTableScanExec(plan) + + def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer = + BackendsApiManager.getSparkPlanExecApiInstance.getLocalTableScanTransform(plan) +}