Skip to content

Commit

Permalink
Add deletion vector metrics for low shuffle merge. (NVIDIA#11132)
Browse files Browse the repository at this point in the history
* Add deletion vector metrics

* Add for databricks

Signed-off-by: liurenjie1024 <[email protected]>

* Fix comments

* Fix comments

---------

Signed-off-by: liurenjie1024 <[email protected]>
  • Loading branch information
liurenjie1024 authored Jul 9, 2024
1 parent 6f36d35 commit 29904a3
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.nvidia.spark.rapids.delta

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Scalar, Table}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuMetric}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector
import org.roaringbitmap.longlong.{PeekableLongIterator, Roaring64Bitmap}

import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
Expand Down Expand Up @@ -53,7 +53,9 @@ object GpuDeltaParquetFileFormatUtils {
schema: StructType,
delVector: Option[Roaring64Bitmap],
input: Iterator[ColumnarBatch],
maxBatchSize: Int): Iterator[ColumnarBatch] = {
maxBatchSize: Int,
delVectorScatterTimeMetric: GpuMetric
): Iterator[ColumnarBatch] = {
val metadataRowIndexCol = schema.fieldNames.indexOf(METADATA_ROW_IDX_COL)
val delRowIdx = schema.fieldNames.indexOf(METADATA_ROW_DEL_COL)
if (metadataRowIndexCol == -1 && delRowIdx == -1) {
Expand All @@ -74,20 +76,31 @@ object GpuDeltaParquetFileFormatUtils {
Some(delRowIdx)
}
val newBatch = addMetadataColumns(rowIdxCol, delRowIdx2, delVector,maxBatchSize,
rowIndex, batch)
rowIndex, batch, delVectorScatterTimeMetric)
rowIndex += batch.numRows()
newBatch
}
}
}

private def createFalseTable(numRows: Int): Table = {
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, numRows)) { c =>
new Table(c)
}
}
}


private def addMetadataColumns(
rowIdxPos: Option[Int],
delRowIdx: Option[Int],
delVec: Option[Roaring64Bitmap],
maxBatchSize: Int,
rowIdxStart: Long,
batch: ColumnarBatch): ColumnarBatch = {
batch: ColumnarBatch,
delVectorScatterTimeMetric: GpuMetric,
): ColumnarBatch = {
val rowIdxCol = rowIdxPos.map { _ =>
withResource(Scalar.fromLong(rowIdxStart)) { start =>
GpuColumnVector.from(CudfColumnVector.sequence(start, batch.numRows()),
Expand All @@ -98,30 +111,26 @@ object GpuDeltaParquetFileFormatUtils {
closeOnExcept(rowIdxCol) { rowIdxCol =>

val delVecCol = delVec.map { delVec =>
withResource(Scalar.fromBool(false)) { s =>
withResource(CudfColumnVector.fromScalar(s, batch.numRows())) { c =>
var table = new Table(c)
val posIter = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows(),
).grouped(Math.min(maxBatchSize, batch.numRows()))

for (posChunk <- posIter) {
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
table = withResource(table) { _ =>
delVectorScatterTimeMetric.ns {
val table = new RoaringBitmapIterator(
delVec.getLongIteratorFrom(rowIdxStart),
rowIdxStart,
rowIdxStart + batch.numRows())
.grouped(Math.min(maxBatchSize, batch.numRows()))
.foldLeft(createFalseTable(batch.numRows())){ (table, posChunk) =>
withResource(table) { _ =>
withResource(CudfColumnVector.fromLongs(posChunk: _*)) { poses =>
withResource(Scalar.fromBool(true)) { s =>
Table.scatter(Array(s), poses, table)
}
}
}
}

withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
withResource(table) { _ =>
GpuColumnVector.from(table.getColumn(0).incRefCount(),
METADATA_ROW_DEL_FIELD.dataType)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,22 @@ case class GpuDelta24xParquetFileFormat(
val maxDelVecScatterBatchSize = RapidsConf
.DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
.get(sparkSession.sessionState.conf)
val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)


(file: PartitionedFile) => {
val input = dataReader(file)
val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString())))
.map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner)
.map { dv =>
delVecSizeMetric += dv.descriptor.inlineData.length
RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner
}
addMetadataColumnToIterator(prepareSchema(requiredSchema),
dv,
input.asInstanceOf[Iterator[ColumnarBatch]],
maxDelVecScatterBatchSize)
maxDelVecScatterBatchSize,
delVecScatterTimeMetric)
.asInstanceOf[Iterator[InternalRow]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,21 @@ case class GpuDeltaParquetFileFormat(
.DELTA_LOW_SHUFFLE_MERGE_SCATTER_DEL_VECTOR_BATCH_SIZE
.get(sparkSession.sessionState.conf)

val delVecScatterTimeMetric = metrics(GpuMetric.DELETION_VECTOR_SCATTER_TIME)
val delVecSizeMetric = metrics(GpuMetric.DELETION_VECTOR_SIZE)

(file: PartitionedFile) => {
val input = dataReader(file)
val dv = delVecs.flatMap(_.value.get(new URI(file.filePath.toString())))
.map(dv => RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner)
.map { dv =>
delVecSizeMetric += dv.descriptor.inlineData.length
RoaringBitmapWrapper.deserializeFromBytes(dv.descriptor.inlineData).inner
}
addMetadataColumnToIterator(prepareSchema(requiredSchema),
dv,
input.asInstanceOf[Iterator[ColumnarBatch]],
maxDelVecScatterBatchSize
maxDelVecScatterBatchSize,
delVecScatterTimeMetric
).asInstanceOf[Iterator[InternalRow]]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ object GpuMetric extends Logging {
val FILECACHE_DATA_RANGE_MISSES_SIZE = "filecacheDataRangeMissesSize"
val FILECACHE_FOOTER_READ_TIME = "filecacheFooterReadTime"
val FILECACHE_DATA_RANGE_READ_TIME = "filecacheDataRangeReadTime"
val DELETION_VECTOR_SCATTER_TIME = "deletionVectorScatterTime"
val DELETION_VECTOR_SIZE = "deletionVectorSize"

// Metric Descriptions.
val DESCRIPTION_BUFFER_TIME = "buffer time"
Expand Down Expand Up @@ -117,6 +119,8 @@ object GpuMetric extends Logging {
val DESCRIPTION_FILECACHE_DATA_RANGE_MISSES_SIZE = "cached data misses size"
val DESCRIPTION_FILECACHE_FOOTER_READ_TIME = "cached footer read time"
val DESCRIPTION_FILECACHE_DATA_RANGE_READ_TIME = "cached data read time"
val DESCRIPTION_DELETION_VECTOR_SCATTER_TIME = "deletion vector scatter time"
val DESCRIPTION_DELETION_VECTOR_SIZE = "deletion vector size"

def unwrap(input: GpuMetric): SQLMetric = input match {
case w :WrappedGpuMetric => w.sqlMetric
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ case class GpuFileSourceScanExec(
"filesSize" -> createSizeMetric(ESSENTIAL_LEVEL, "size of files read"),
GPU_DECODE_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_GPU_DECODE_TIME),
BUFFER_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_BUFFER_TIME),
FILTER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_FILTER_TIME)
FILTER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_FILTER_TIME),
DELETION_VECTOR_SCATTER_TIME -> createNanoTimingMetric(MODERATE_LEVEL,
DESCRIPTION_DELETION_VECTOR_SCATTER_TIME),
DELETION_VECTOR_SIZE -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_DELETION_VECTOR_SIZE)
) ++ fileCacheMetrics ++ {
relation.fileFormat match {
case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat =>
Expand Down

0 comments on commit 29904a3

Please sign in to comment.