Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/branch-24.08' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
razajafri committed Jul 9, 2024
2 parents 8ffc4f1 + 29904a3 commit f599413
Show file tree
Hide file tree
Showing 22 changed files with 227 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.nvidia.spark.rapids.delta

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Scalar, Table}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuMetric}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector
import org.roaringbitmap.longlong.{PeekableLongIterator, Roaring64Bitmap}

import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
Expand Down Expand Up @@ -53,7 +53,9 @@ object GpuDeltaParquetFileFormatUtils {
schema: StructType,
delVector: Option[Roaring64Bitmap],
input: Iterator[ColumnarBatch],
maxBatchSize: Int): Iterator[ColumnarBatch] = {
maxBatchSize: Int,
delVectorScatterTimeMetric: GpuMetric
): Iterator[ColumnarBatch] = {
val metadataRowIndexCol = schema.fieldNames.indexOf(METADATA_ROW_IDX_COL)
val delRowIdx = schema.fieldNames.indexOf(METADATA_ROW_DEL_COL)
if (metadataRowIndexCol == -1 && delRowIdx == -1) {
Expand All @@ -74,20 +76,31 @@ object GpuDeltaParquetFileFormatUtils {
Some(delRowIdx)
}
val newBatch = addMetadataColumns(rowIdxCol, delRowIdx2, delVector,maxBatchSize,
rowIndex, batch)
rowIndex, batch, delVectorScatterTimeMetric)
rowIndex += batch.numRows()
newBatch
}
}
}

private def createFalseTable(numRows: Int): Table = {
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, numRows)) { c =>
new Table(c)
}
}
}


private def addMetadataColumns(
rowIdxPos: Option[Int],
delRowIdx: Option[Int],
delVec: Option[Roaring64Bitmap],
maxBatchSize: Int,
rowIdxStart: Long,
batch: ColumnarBatch): ColumnarBatch = {
batch: ColumnarBatch,
delVectorScatterTimeMetric: GpuMetric,
): ColumnarBatch = {
val rowIdxCol = rowIdxPos.map { _ =>
withResource(Scalar.fromLong(rowIdxStart)) { start =>
GpuColumnVector.from(CudfColumnVector.sequence(start, batch.numRows()),
Expand All @@ -98,30 +111,26 @@ object GpuDeltaParquetFileFormatUtils {
closeOnExcept(rowIdxCol) { rowIdxCol =>

val delVecCol = delVec.map { delVec =>
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, batch.numRows())) { c =>
var table = new Table(c)
val posIter = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows(),
).grouped(Math.min(maxBatchSize, batch.numRows()))

for (posChunk <- posIter) {
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
table = withResource(table) { _ =>
delVectorScatterTimeMetric.ns {
val table = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows())
.grouped(Math.min(maxBatchSize, batch.numRows()))
.foldLeft(createFalseTable(batch.numRows())){ (table, posChunk) =>
withResource(table) { _ =>
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
Table.scatter(Array(s), poses, table)
}
}
}
}

withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,22 @@ case class GpuDelta24xParquetFileFormat(
val maxDelVecScatterBatchSize = RapidsConf
.DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
.get(sparkSession.sessionState.conf)
val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)


(file: PartitionedFile) => {
val input = dataReader(file)
val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString())))
.map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner)
.map { dv =>
delVecSizeMetric += dv.descriptor.inlineData.length
RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner
}
addMetadataColumnToIterator(prepareSchema(requiredSchema),
dv,
input.asInstanceOf[Iterator[ColumnarBatch]],
maxDelVecScatterBatchSize)
maxDelVecScatterBatchSize,
delVecScatterTimeMetric)
.asInstanceOf[Iterator[InternalRow]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,21 @@ case class GpuDeltaParquetFileFormat(
.DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
.get(sparkSession.sessionState.conf)

val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)

(file: PartitionedFile) => {
val input = dataReader(file)
val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString())))
.map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner)
.map { dv =>
delVecSizeMetric += dv.descriptor.inlineData.length
RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner
}
addMetadataColumnToIterator(prepareSchema(requiredSchema),
dv,
input.asInstanceOf[Iterator[ColumnarBatch]],
maxDelVecScatterBatchSize
maxDelVecScatterBatchSize,
delVecScatterTimeMetric
).asInstanceOf[Iterator[InternalRow]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# - ROCKY_VER: Rocky Linux OS version

ARG CUDA_VER=11.8.0
ARG UCX_VER=1.16.0
ARG UCX_VER=1.17.0
ARG UCX_CUDA_VER=11
ARG UCX_ARCH=x86_64
ARG ROCKY_VER=8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# - ROCKY_VER: Rocky Linux OS version

ARG CUDA_VER=11.8.0
ARG UCX_VER=1.16.0
ARG UCX_VER=1.17.0
ARG UCX_CUDA_VER=11
ARG UCX_ARCH=x86_64
ARG ROCKY_VER=8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#

ARG CUDA_VER=11.8.0
ARG UCX_VER=1.16.0
ARG UCX_VER=1.17.0
ARG UCX_CUDA_VER=11
ARG UCX_ARCH=x86_64
ARG UBUNTU_VER=20.04
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

ARG RDMA_CORE_VERSION=32.1
ARG CUDA_VER=11.8.0
ARG UCX_VER=1.16.0
ARG UCX_VER=1.17.0
ARG UCX_CUDA_VER=11
ARG UCX_ARCH=x86_64
ARG UBUNTU_VER=20.04
Expand Down
5 changes: 3 additions & 2 deletions integration_tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,4 +17,5 @@ pandas
pyarrow
pytest-xdist >= 2.0.0
findspark
fastparquet == 0.8.3
fastparquet == 0.8.3 ; python_version == '3.8'
fastparquet == 2024.5.0 ; python_version >= '3.9'
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,8 +30,6 @@ def fastparquet_unavailable():
return False
except ImportError:
return True
except ValueError: # TODO: remove when https://github.com/NVIDIA/spark-rapids/issues/11070 is fixed
return True


rebase_write_corrected_conf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def write_to_hive_sql(spark, output_table):
_write_to_hive_conf)


@allow_non_gpu(*non_utc_allow)
@pytest.mark.skipif(is_before_spark_330() or (is_databricks_runtime() and not is_databricks122_or_later()),
reason="InsertIntoHiveTable supports bucketed write since Spark 330")
def test_insert_hive_bucketed_table(spark_tmp_table_factory):
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1492,3 +1492,14 @@ def test_parquet_column_name_with_dots(spark_tmp_path, reader_confs):
assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`"), conf=all_confs)
assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`.`c.d.e`.`f.g`"),
conf=all_confs)

def test_parquet_partition_batch_row_count_only_splitting(spark_tmp_path):
data_path = spark_tmp_path + "/PARQUET_DATA"
def setup_table(spark):
spark.range(1000).withColumn("p", f.lit("x")).coalesce(1)\
.write\
.partitionBy("p")\
.parquet(data_path)
with_cpu_session(lambda spark: setup_table(spark))
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).select("p"),
conf={"spark.rapids.sql.columnSizeBytes": "100"})
Loading

0 comments on commit f599413

Please sign in to comment.