diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index b7a1e172b2c8..416b4df01da2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -1205,6 +1205,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { override def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec = ColumnarRangeExec(rangeExec.range) + override def isSupportRDDScanExec(plan: RDDScanExec): Boolean = true + + override def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer = + VeloxRDDScanTransformer.replace(plan) + override def genColumnarTailExec(limit: Int, child: SparkPlan): ColumnarCollectTailBaseExec = ColumnarCollectTailExec(limit, child) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala new file mode 100644 index 000000000000..6e96c5e173b1 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxRDDScanTransformer.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.velox.VeloxValidatorApi +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RDDScanTransformer, SparkPlan} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Velox-backend implementation of RDDScanTransformer. + * + * Converts an RDD[InternalRow] into columnar batches using Velox's native row-to-columnar + * conversion (same JNI path as RowToVeloxColumnarExec). + */ +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + // Row-to-columnar conversion preserves data distribution, so we carry through + // the original partitioning. This differs from CH which uses UnknownPartitioning(0) + // but is consistent with RowToVeloxColumnarExec's behavior. + override val outputPartitioning: Partitioning, + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + + @transient override lazy val metrics: Map[String, SQLMetric] = Map( + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert") + ) + + override protected def doValidateInternal(): ValidationResult = { + for (field <- schema.fields) { + val reason = VeloxValidatorApi.validateSchema(field.dataType) + if (reason.isDefined) { + return ValidationResult.failed(reason.get) + } + } + ValidationResult.succeeded + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val numInputRows = longMetric("numInputRows") + val numOutputBatches = longMetric("numOutputBatches") + val convertTime = longMetric("convertTime") + val localSchema = this.schema + val batchSize = GlutenConfig.get.maxBatchSize + val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes + rdd.mapPartitions { + iter => + if (iter.hasNext) { + val first = iter.next() + first match { + case _: BatchCarrierRow => + // RDD already contains columnar batches wrapped as carrier rows + // (e.g., from df.checkpoint() on a Gluten plan). Unwrap directly. + (Iterator.single(first) ++ iter).flatMap { + row => + BatchCarrierRow.unwrap(row).map { + batch => + numOutputBatches += 1 + numInputRows += batch.numRows() + batch + } + } + case _ => + // Standard InternalRow path - convert via native row-to-columnar. + RowToVeloxColumnarExec.toColumnarBatchIterator( + Iterator.single(first) ++ iter, + localSchema, + numInputRows, + numOutputBatches, + convertTime, + batchSize, + batchBytes) + } + } else { + Iterator.empty + } + } + } + + override protected def withNewChildrenInternal( + newChildren: IndexedSeq[SparkPlan]): SparkPlan = { + assert(newChildren.isEmpty, "VeloxRDDScanTransformer is a leaf node") + copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) + } +} + +object VeloxRDDScanTransformer { + def replace(plan: org.apache.spark.sql.execution.RDDScanExec): RDDScanTransformer = + VeloxRDDScanTransformer( + plan.output, + plan.inputRDD, + plan.nodeName, + plan.outputPartitioning, + plan.outputOrdering) +} diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala new file mode 100644 index 000000000000..fe11c5b5eb2c --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRDDScanSuite.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution._ + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.classic.ClassicDataset +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +class VeloxRDDScanSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { + + override protected val resourcePath: String = "/tpch-data-parquet" + override protected val fileFormat: String = "parquet" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.sql.ansi.enabled", "false") + } + + override def beforeAll(): Unit = { + super.beforeAll() + createTPCHNotNullTables() + } + + /** Creates a DataFrame backed by LogicalRDD/RDDScanExec from an existing DataFrame. */ + private def asRDDScanDF(data: DataFrame): DataFrame = { + val node = LogicalRDD( + data.queryExecution.logical.output, + data.queryExecution.toRdd)(data.sparkSession) + ClassicDataset.ofRows(spark, node).toDF() + } + + test("basic RDDScanExec is replaced by VeloxRDDScanTransformer") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 10") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with string and numeric types") { + val data = spark.sql("""SELECT l_returnflag, l_linestatus, l_quantity, l_extendedprice + |FROM lineitem LIMIT 20""".stripMargin) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with aggregation downstream") { + val query = + """SELECT l_returnflag, sum(l_quantity) AS sum_qty + |FROM lineitem + |WHERE l_shipdate <= date'1998-09-02' + |GROUP BY l_returnflag""".stripMargin + val data = spark.sql(query) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with empty RDD") { + val data = spark.sql("SELECT l_orderkey FROM lineitem WHERE 1 = 0") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + assert(df.count() == 0) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan preserves data correctness with multiple re-reads") { + val data = spark.sql("SELECT l_orderkey, l_partkey FROM lineitem LIMIT 50") + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + // Read twice to verify idempotency + checkAnswer(df, expectedAnswer) + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with null values") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(1, "a", null), + Row(null, "b", 2.0), + Row(3, null, 3.0) + )) + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = true), + StructField("name", StringType, nullable = true), + StructField("value", DoubleType, nullable = true) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with all supported primitive types") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row( + true, + 1.toByte, + 2.toShort, + 3, + 4L, + 5.0f, + 6.0, + "hello", + java.sql.Date.valueOf("2024-01-01"), + java.sql.Timestamp.valueOf("2024-01-01 12:00:00"), + Array[Byte](1, 2, 3), + BigDecimal("123.45").underlying() + ) + )) + val schema = StructType( + Seq( + StructField("bool", BooleanType), + StructField("byte", ByteType), + StructField("short", ShortType), + StructField("int", IntegerType), + StructField("long", LongType), + StructField("float", FloatType), + StructField("double", DoubleType), + StructField("string", StringType), + StructField("date", DateType), + StructField("timestamp", TimestampType), + StructField("binary", BinaryType), + StructField("decimal", DecimalType(10, 2)) + )) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with array type") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Seq(1, 2, 3)), + Row(Seq(4, 5)) + )) + val schema = StructType(Seq(StructField("arr", ArrayType(IntegerType)))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with map type") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Map("a" -> 1, "b" -> 2)), + Row(Map("c" -> 3)) + )) + val schema = StructType(Seq(StructField("m", MapType(StringType, IntegerType)))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan with struct type") { + val rdd = spark.sparkContext.parallelize( + Seq( + Row(Row("hello", 1)), + Row(Row("world", 2)) + )) + val innerSchema = StructType( + Seq(StructField("name", StringType), StructField("value", IntegerType))) + val schema = StructType(Seq(StructField("s", innerSchema))) + val data = spark.createDataFrame(rdd, schema) + val expectedAnswer = data.collect() + val df = asRDDScanDF(data) + + checkAnswer(df, expectedAnswer) + val cnt = collect(df.queryExecution.executedPlan) { case _: VeloxRDDScanTransformer => true } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } + + test("RDDScan falls back for unsupported types") { + val data = spark.sql("SELECT INTERVAL '1' DAY AS di") + val expectedAnswer = data.collect() + val result = asRDDScanDF(data) + + // Should still produce correct results via fallback to vanilla Spark + checkAnswer(result, expectedAnswer) + val cnt = collect(result.queryExecution.executedPlan) { + case _: VeloxRDDScanTransformer => true + } + assert(cnt.isEmpty, "Expected fallback - VeloxRDDScanTransformer should NOT be in plan") + } + + test("RDDScan handles BatchCarrierRow from checkpoint") { + val tempDir = Utils.createTempDir() + try { + spark.sparkContext.setCheckpointDir(tempDir.getAbsolutePath) + val df = spark.range(100).selectExpr("id", "id * 2 as value") + val checkpointed = df.localCheckpoint() + val result = asRDDScanDF(checkpointed) + + checkAnswer(result, df.collect()) + val cnt = collect(result.queryExecution.executedPlan) { + case _: VeloxRDDScanTransformer => true + } + assert(cnt.nonEmpty, "Expected VeloxRDDScanTransformer in plan") + } finally { + Utils.deleteRecursively(tempDir) + } + } +}