Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
VeloxColumnarToCarrierRowExec.enforce(plan)
}

override def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = {
// Skip offloading when stream is defined (structured streaming source)
plan.getStream.isEmpty
}

override def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
VeloxLocalTableScanTransformer.replace(plan)

override def genTimestampAddTransformer(
substraitExprName: String,
left: ExpressionTransformer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.velox.VeloxValidatorApi
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LocalTableScanTransformer, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Velox-backend implementation of LocalTableScanTransformer.
*
* Converts a driver-side local collection (Seq[InternalRow]) into columnar batches using Velox's
* native row-to-columnar conversion (same JNI path as RowToVeloxColumnarExec).
*/
case class VeloxLocalTableScanTransformer(
outputAttributes: Seq[Attribute],
rows: Seq[InternalRow],
// Row-to-columnar conversion preserves data distribution, so we carry through
// the original partitioning, consistent with RowToVeloxColumnarExec's behavior.
override val outputPartitioning: Partitioning,
override val outputOrdering: Seq[SortOrder]
) extends LocalTableScanTransformer(outputAttributes, outputPartitioning, outputOrdering)
with Logging {

@transient override lazy val metrics: Map[String, SQLMetric] = Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
)

override protected def doValidateInternal(): ValidationResult = {
for (field <- schema.fields) {
val reason = VeloxValidatorApi.validateSchema(field.dataType)
if (reason.isDefined) {
return ValidationResult.failed(reason.get)
}
}
ValidationResult.succeeded
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
val localSchema = this.schema
val batchSize = GlutenConfig.get.maxBatchSize
val batchBytes = VeloxConfig.get.veloxPreferredBatchBytes

if (rows.isEmpty) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
// Materialize rows as UnsafeRow on the driver, then parallelize
val proj = UnsafeProjection.create(outputAttributes, outputAttributes)
val unsafeRows = rows.map(r => proj(r).copy()).toArray
val numSlices = math.min(unsafeRows.length, sparkContext.defaultParallelism)
val rowRdd = sparkContext.parallelize(unsafeRows.toSeq, numSlices)

rowRdd.mapPartitions {
iter =>
RowToVeloxColumnarExec.toColumnarBatchIterator(
iter,
localSchema,
numInputRows,
numOutputBatches,
convertTime,
batchSize,
batchBytes)
}
}
}

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[SparkPlan]): SparkPlan = {
assert(newChildren.isEmpty, "VeloxLocalTableScanTransformer is a leaf node")
copy(outputAttributes, rows, outputPartitioning, outputOrdering)
}
}

object VeloxLocalTableScanTransformer {
def replace(plan: org.apache.spark.sql.execution.LocalTableScanExec): LocalTableScanTransformer =
VeloxLocalTableScanTransformer(
plan.output,
plan.rows,
plan.outputPartitioning,
plan.outputOrdering)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.execution._

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.types._

import java.util.{Arrays => JArrays}

class VeloxLocalTableScanSuite
extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPlanHelper {

override protected val resourcePath: String = "/tpch-data-parquet"
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.ansi.enabled", "false")
}

private def assertHasVeloxLocalTableScan(df: DataFrame): Unit = {
val found = collect(df.queryExecution.executedPlan) {
case _: VeloxLocalTableScanTransformer => true
}
assert(found.nonEmpty, "Expected VeloxLocalTableScanTransformer in plan")
}

private def createDF(rows: Seq[Row], schema: StructType): DataFrame = {
spark.createDataFrame(JArrays.asList(rows: _*), schema)
}

test("basic LocalTableScanExec with int and string columns") {
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val rows = Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))
val df = createDF(rows, schema)
checkAnswer(df, rows)
assertHasVeloxLocalTableScan(df)
}

test("LocalTableScan with numeric types") {
val schema = StructType(
Seq(
StructField("lng", LongType),
StructField("dbl", DoubleType),
StructField("flt", FloatType),
StructField("shrt", ShortType),
StructField("byt", ByteType)))
val rows = Seq(Row(1L, 1.5, 2.5f, 100.toShort, 42.toByte))
val df = createDF(rows, schema)
checkAnswer(df, rows)
assertHasVeloxLocalTableScan(df)
}

test("LocalTableScan with boolean and null types") {
val schema = StructType(
Seq(StructField("flag", BooleanType), StructField("value", IntegerType, nullable = true)))
val rows = Seq(Row(true, 1), Row(false, null))
val df = createDF(rows, schema)
checkAnswer(df, rows)
assertHasVeloxLocalTableScan(df)
}

test("LocalTableScan with empty collection") {
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = createDF(Seq.empty, schema)
checkAnswer(df, Seq.empty[Row])
}

test("LocalTableScan with aggregation downstream") {
val schema = StructType(Seq(StructField("key", StringType), StructField("value", IntegerType)))
val rows = Seq(Row("a", 10), Row("b", 20), Row("a", 30))
val df = createDF(rows, schema)
val result = df.groupBy("key").sum("value")
checkAnswer(result, Seq(Row("a", 40), Row("b", 20)))
assertHasVeloxLocalTableScan(result)
}

test("LocalTableScan with filter downstream") {
val schema = StructType(Seq(StructField("x", IntegerType)))
val rows = Seq(Row(1), Row(2), Row(3), Row(4), Row(5))
val df = createDF(rows, schema).filter("x > 3")
checkAnswer(df, Seq(Row(4), Row(5)))
assertHasVeloxLocalTableScan(df)
}

test("LocalTableScan with join") {
val leftSchema =
StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val rightSchema =
StructType(Seq(StructField("id", IntegerType), StructField("score", IntegerType)))
val left = createDF(Seq(Row(1, "a"), Row(2, "b")), leftSchema)
val right = createDF(Seq(Row(1, 100), Row(2, 200)), rightSchema)
val result = left.join(right, "id")
checkAnswer(result, Seq(Row(1, "a", 100), Row(2, "b", 200)))
assertHasVeloxLocalTableScan(result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,11 @@ trait SparkPlanExecApi {
def getRDDScanTransform(plan: RDDScanExec): RDDScanTransformer =
throw new GlutenNotSupportException("RDDScanExec is not supported")

def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean = false

def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
throw new GlutenNotSupportException("LocalTableScanExec is not supported")

def copyColumnarBatch(batch: ColumnarBatch): ColumnarBatch =
throw new GlutenNotSupportException("Copying ColumnarBatch is not supported")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ object OffloadOthers {
child)
case plan: RDDScanExec if RDDScanTransformer.isSupportRDDScanExec(plan) =>
RDDScanTransformer.getRDDScanTransform(plan)
case plan: LocalTableScanExec
if LocalTableScanTransformer.isSupportLocalTableScanExec(plan) =>
LocalTableScanTransformer.getLocalTableScanTransform(plan)
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not supported.")
p
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.ValidatablePlan
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}

abstract class LocalTableScanTransformer(
outputAttributes: Seq[Attribute],
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil
) extends ValidatablePlan {

override def rowType0(): Convention.RowType = Convention.RowType.None
override def batchType(): Convention.BatchType = BackendsApiManager.getSettings.primaryBatchType
override def output: Seq[Attribute] = outputAttributes

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}

override def children: Seq[SparkPlan] = Seq.empty
}

object LocalTableScanTransformer {
def isSupportLocalTableScanExec(plan: LocalTableScanExec): Boolean =
BackendsApiManager.getSparkPlanExecApiInstance.isSupportLocalTableScanExec(plan)

def getLocalTableScanTransform(plan: LocalTableScanExec): LocalTableScanTransformer =
BackendsApiManager.getSparkPlanExecApiInstance.getLocalTableScanTransform(plan)
}
Loading