Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Databricks 14.3 Support [DO NOT MERGE] #11467

Draft
wants to merge 38 commits into
base: branch-24.12
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0efcf6e
db 14.3
Sep 5, 2024
ed1abbf
release350db
Sep 5, 2024
1c83368
avro and parquet fix
Sep 6, 2024
04a85e4
more dependencies
razajafri Sep 6, 2024
60cf190
avro dependecy
razajafri Sep 10, 2024
6f9397f
move shims around
razajafri Sep 11, 2024
fdd4c03
More Shim changes
razajafri Sep 11, 2024
db70c02
upmerged
razajafri Sep 11, 2024
4c547db
Signing off
razajafri Sep 11, 2024
7c1ea3e
Fixed jar name in install deps
razajafri Sep 11, 2024
799a909
updated Scala 2.13 pom
razajafri Sep 16, 2024
8ea5a52
add a check for 3.5 in install_deps.py
razajafri Sep 16, 2024
a1df759
Removed unnecessary changes
razajafri Sep 16, 2024
5eff73e
Removed PartitionedFileUtilsShim
razajafri Sep 16, 2024
92c6f59
Updated PythonMapInArrowExec to MapInArrowExec
razajafri Sep 16, 2024
dafccf7
conf is private in sqlContext, use sessionState instead
razajafri Sep 16, 2024
32131b8
Moved NullOutputStreamShim
razajafri Sep 17, 2024
52d2418
sql-plugin building
razajafri Sep 17, 2024
383225c
First batch of delta lake changes
razajafri Sep 17, 2024
4fab4ba
Removed duplicate shims for 350db
razajafri Sep 18, 2024
4884080
delta lake support changes
razajafri Sep 18, 2024
413761f
More shims
razajafri Sep 19, 2024
3da0dc3
delta-lake changes for release 350db
razajafri Sep 23, 2024
62a3d90
Added ParquetCVShim and ShimParquetColumnVector
razajafri Sep 23, 2024
6fc8835
Shims created, building 341db successfully
razajafri Sep 23, 2024
e904927
CudfUnsafeRow changes for 350db
razajafri Sep 23, 2024
76c2fbc
Shim changes
razajafri Sep 24, 2024
f607d21
Removed the GpuOptimisticTransactionBase from common
razajafri Sep 24, 2024
341fff8
Organized imports
razajafri Sep 24, 2024
978778d
Reverted unnecessary change
razajafri Sep 24, 2024
0ed0c53
Fixed 341db build by adding shim and referring to the right module
razajafri Sep 24, 2024
1e690cf
Renamed ShimVectorizedColumnReader to RapidsVectorizedColumnReader to…
razajafri Sep 24, 2024
394604c
Don't use native dictionary for 350db for now
razajafri Sep 24, 2024
ee946a5
Updated Scala2.13 poms
razajafri Sep 25, 2024
9b68cd3
Fixed build for 340+
razajafri Sep 25, 2024
5709d4a
Add support for the Databricks 14.3 pre-merge pipeline
NvTimLiu Sep 26, 2024
8979789
Updated ShimServiceProvider to return the correct Shim version
razajafri Sep 26, 2024
9274700
Use the correct shim for decimal multiplication
razajafri Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,23 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>release350db</id>
<activation>
<property>
<name>buildver</name>
<value>350db</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-spark350db_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
</dependencies>
</profile>
<profile>
<id>release351</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
{"spark": "342"}
{"spark": "343"}
{"spark": "350"}
{"spark": "350db"}
{"spark": "351"}
{"spark": "352"}
spark-rapids-shim-json-lines ***/
Expand Down
1 change: 1 addition & 0 deletions delta-lake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and directory contains the corresponding support code.
| Databricks 11.3 | Databricks 11.3 | `delta-spark330db` |
| Databricks 12.2 | Databricks 12.2 | `delta-spark332db` |
| Databricks 13.3 | Databricks 13.3 | `delta-spark341db` |
| Databricks 14.3 | Databricks 14.3 | `delta-spark350db` |

Delta Lake is not supported on all Spark versions, and for Spark versions where it is not
supported the `delta-stub` project is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object GpuDeltaLog {
dataPath: String,
options: Map[String, String],
rapidsConf: RapidsConf): GpuDeltaLog = {
val deltaLog = DeltaLog.forTable(spark, dataPath, options)
val deltaLog = DeltaLog.forTable(spark, new Path(dataPath), options)
new GpuDeltaLog(deltaLog, rapidsConf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class DeltaCreatableRelationProviderMeta(
}
val path = saveCmd.options.get("path")
if (path.isDefined) {
val deltaLog = DeltaLog.forTable(SparkSession.active, path.get, saveCmd.options)
val deltaLog = DeltaLog.forTable(SparkSession.active, new Path(path.get), saveCmd.options)
RapidsDeltaUtils.tagForDeltaWrite(this, saveCmd.query.schema, Some(deltaLog),
saveCmd.options, SparkSession.active)
} else {
Expand All @@ -346,4 +346,4 @@ class DeltaCreatableRelationProviderMeta(
}

override def convertToGpu(): GpuCreatableRelationProvider = new GpuDeltaDataSource(conf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.sql.transaction.tahoe.rapids

import com.databricks.sql.transaction.tahoe._
import com.databricks.sql.transaction.tahoe.actions.FileAction
import com.databricks.sql.transaction.tahoe.constraints.{Constraint, DeltaInvariantCheckerExec}
import com.databricks.sql.transaction.tahoe.files.TahoeBatchFileIndex
import com.databricks.sql.transaction.tahoe.metering.DeltaLogging
import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf
import com.nvidia.spark.rapids._

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.rapids.GpuV1WriteUtils.GpuEmpty2Null
import org.apache.spark.sql.rapids.delta.{DeltaShufflePartitionsUtil, GpuOptimizeWriteExchangeExec, OptimizeWriteExchangeExec}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Clock

/**
* Used to perform a set of reads in a transaction and then commit a set of updates to the
* state of the log. All reads from the DeltaLog, MUST go through this instance rather
* than directly to the DeltaLog otherwise they will not be check for logical conflicts
* with concurrent updates.
*
* This class is not thread-safe.
*
* @param deltaLog The Delta Log for the table this transaction is modifying.
* @param snapshot The snapshot that this transaction is reading at.
* @param rapidsConf RAPIDS Accelerator config settings.
*/
abstract class GpuOptimisticTransactionBase
(deltaLog: DeltaLog, snapshot: Snapshot, val rapidsConf: RapidsConf)
(implicit clock: Clock)
extends OptimisticTransaction(deltaLog, snapshot)(clock)
with DeltaLogging {

/**
* Adds checking of constraints on the table
* @param plan Plan to generate the table to check against constraints
* @param constraints Constraints to check on the table
* @return GPU columnar plan to execute
*/
protected def addInvariantChecks(plan: SparkPlan, constraints: Seq[Constraint]): SparkPlan = {
val cpuInvariants =
DeltaInvariantCheckerExec.buildInvariantChecks(plan.output, constraints, plan.session)
GpuCheckDeltaInvariant.maybeConvertToGpu(cpuInvariants, rapidsConf) match {
case Some(gpuInvariants) =>
val gpuPlan = convertToGpu(plan)
GpuDeltaInvariantCheckerExec(gpuPlan, gpuInvariants)
case None =>
val cpuPlan = convertToCpu(plan)
DeltaInvariantCheckerExec(cpuPlan, constraints)
}
}

/** GPU version of convertEmptyToNullIfNeeded */
private def gpuConvertEmptyToNullIfNeeded(
plan: GpuExec,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
if (!spark.conf.get(DeltaSQLConf.CONVERT_EMPTY_TO_NULL_FOR_STRING_PARTITION_COL)) {
return plan
}
// No need to convert if there are no constraints. The empty strings will be converted later by
// FileFormatWriter and FileFormatDataWriter. Note that we might still do unnecessary convert
// here as the constraints might not be related to the string partition columns. A precise
// check will need to walk the constraints to see if such columns are really involved. It
// doesn't seem to worth the effort.
if (constraints.isEmpty) return plan

val partSet = AttributeSet(partCols)
var needConvert = false
val projectList: Seq[NamedExpression] = plan.output.map {
case p if partSet.contains(p) && p.dataType == StringType =>
needConvert = true
GpuAlias(GpuEmpty2Null(p), p.name)()
case attr => attr
}
if (needConvert) GpuProjectExec(projectList.toList, plan) else plan
}

/**
* If there is any string partition column and there are constraints defined, add a projection to
* convert empty string to null for that column. The empty strings will be converted to null
* eventually even without this convert, but we want to do this earlier before check constraints
* so that empty strings are correctly rejected. Note that this should not cause the downstream
* logic in `FileFormatWriter` to add duplicate conversions because the logic there checks the
* partition column using the original plan's output. When the plan is modified with additional
* projections, the partition column check won't match and will not add more conversion.
*
* @param plan The original SparkPlan.
* @param partCols The partition columns.
* @param constraints The defined constraints.
* @return A SparkPlan potentially modified with an additional projection on top of `plan`
*/
override def convertEmptyToNullIfNeeded(
plan: SparkPlan,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
// Reuse the CPU implementation if the plan ends up on the CPU, otherwise do the
// equivalent on the GPU.
plan match {
case g: GpuExec => gpuConvertEmptyToNullIfNeeded(g, partCols, constraints)
case _ => super.convertEmptyToNullIfNeeded(plan, partCols, constraints)
}
}

override def writeFiles(
inputData: Dataset[_],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
writeFiles(inputData, None, additionalConstraints)
}

protected def applyOptimizeWriteIfNeeded(
spark: SparkSession,
physicalPlan: SparkPlan,
partitionSchema: StructType,
isOptimize: Boolean): SparkPlan = {
val optimizeWriteEnabled = !isOptimize &&
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED)
.orElse(DeltaConfigs.OPTIMIZE_WRITE.fromMetaData(metadata)).getOrElse(false)
if (optimizeWriteEnabled) {
val planWithoutTopRepartition =
DeltaShufflePartitionsUtil.removeTopRepartition(physicalPlan)
val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance(
physicalPlan.output, partitionSchema, spark.sessionState.conf.numShufflePartitions)
planWithoutTopRepartition match {
case p: GpuExec =>
val partMeta = GpuOverrides.wrapPart(partitioning, rapidsConf, None)
partMeta.tagForGpu()
if (partMeta.canThisBeReplaced) {
val plan = GpuOptimizeWriteExchangeExec(partMeta.convertToGpu(), p)
if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes)
}
} else {
GpuColumnarToRowExec(OptimizeWriteExchangeExec(partitioning, p))
}
case p =>
OptimizeWriteExchangeExec(partitioning, p)
}
} else {
physicalPlan
}
}

protected def isOptimizeCommand(plan: LogicalPlan): Boolean = {
val leaves = plan.collectLeaves()
leaves.size == 1 && leaves.head.collect {
case LogicalRelation(HadoopFsRelation(
index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) =>
index.actionType.equals("Optimize")
}.headOption.getOrElse(false)
}

protected def convertToCpu(plan: SparkPlan): SparkPlan = plan match {
case GpuRowToColumnarExec(p, _) => p
case p: GpuExec => GpuColumnarToRowExec(p)
case p => p
}

protected def convertToGpu(plan: SparkPlan): SparkPlan = plan match {
case GpuColumnarToRowExec(p, _) => p
case p: GpuExec => p
case p => GpuRowToColumnarExec(p, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
}
}
Loading
Loading