Skip to content

Commit

Permalink
Fixed build for 340+
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Sep 25, 2024
1 parent ee946a5 commit 9b68cd3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
{"spark": "342"}
{"spark": "343"}
{"spark": "350"}
{"spark": "350db"}
{"spark": "351"}
{"spark": "352"}
{"spark": "400"}
Expand All @@ -42,6 +41,6 @@ trait PartitionedFileUtilsShimBase {
SparkPath.fromPathString(filePath), start, length)

def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = {
pf.copy(locations = locations.toSeq)
pf.copy(locations = locations.toArray)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "350db"}
{"spark": "351"}
{"spark": "352"}
spark-rapids-shim-json-lines ***/
Expand All @@ -25,7 +24,7 @@ package com.nvidia.spark.rapids.shims
import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.MapInArrowExec
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta
Expand All @@ -34,7 +33,7 @@ import org.apache.spark.sql.types.{BinaryType, StringType}
object PythonMapInArrowExecShims {

def execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq(
GpuOverrides.exec[MapInArrowExec](
GpuOverrides.exec[PythonMapInArrowExec](
"The backend for Map Arrow Iterator UDF. Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "350db"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile

trait PartitionedFileUtilsShimBase {

// Wrapper for case class constructor so Java code can access
// the default values across Spark versions.
def newPartitionedFile(partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long): PartitionedFile = PartitionedFile(partitionValues,
SparkPath.fromPathString(filePath), start, length)

def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = {
pf.copy(locations = locations)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "350db"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.MapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.types.{BinaryType, StringType}

object PythonMapInArrowExecShims {

def execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq(
GpuOverrides.exec[MapInArrowExec](
"The backend for Map Arrow Iterator UDF. Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuPythonMapInArrowExecMeta(mapPy, conf, p, r) {
override def tagPlanForGpu(): Unit = {
super.tagPlanForGpu()
if (SQLConf.get.getConf(SQLConf.ARROW_EXECUTION_USE_LARGE_VAR_TYPES)) {

val inputTypes = mapPy.child.schema.fields.map(_.dataType)
val outputTypes = mapPy.output.map(_.dataType)

val hasStringOrBinaryTypes = (inputTypes ++ outputTypes).exists(dataType =>
TrampolineUtil.dataTypeExistsRecursively(dataType,
dt => dt == StringType || dt == BinaryType))

if (hasStringOrBinaryTypes) {
willNotWorkOnGpu(s"${SQLConf.ARROW_EXECUTION_USE_LARGE_VAR_TYPES.key} is " +
s"enabled and the schema contains string or binary types. This is not " +
s"supported on the GPU.")
}
}
}
})
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap

}

0 comments on commit 9b68cd3

Please sign in to comment.