Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
Expand Down Expand Up @@ -45,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -562,6 +564,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
}
}.toArray

private val reservePartitionColumns: Boolean =
description.partitionColumns.exists {
pcol =>
description.dataColumns.exists {
dcol => dcol.name == pcol.name && dcol.exprId == pcol.exprId
}
}

private def beforeWrite(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
Expand Down Expand Up @@ -590,43 +600,87 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
record match {
case carrierRow: BatchCarrierRow =>
carrierRow match {
case placeholderRow: PlaceholderRow =>
case _: PlaceholderRow =>
// Do nothing.
case terminalRow: TerminalRow =>
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(
terminalRow.batch(),
partitionColIndice,
isBucketed)
val iter = blockStripes.iterator()
while (iter.hasNext) {
val blockStripe = iter.next()
val headingRow = blockStripe.getHeadingRow
beforeWrite(headingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
assert(numRowsOfCurrentColumnarBatch > 0)
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
currentWriter.write(currentTerminalRow)
statsTrackers.foreach {
tracker =>
tracker.newRow(currentWriter.path, currentTerminalRow)
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
tracker.newRow(currentWriter.path, new PlaceholderRow())
}
}
currentColumnBatch.close()
}
blockStripes.release()
recordsInFile += numRows
}
writePartitionedBatch(terminalRow)
}
case _ =>
beforeWrite(record)
writeRecord(record)
}
}

private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
assert(rowCount > 0)
currentWriter.write(terminalRow)
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
recordsInFile += rowCount
}

private def writeCurrentBatchWithMaxRecords(
terminalRow: TerminalRow,
columnBatch: ColumnarBatch): Unit = {
val numRows = columnBatch.numRows()
var offset = 0
while (offset < numRows) {
val rowsRemaining = numRows - offset
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
if (recordsInFile >= description.maxRecordsPerFile) {
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
}
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
} else {
rowsRemaining
}

assert(rowsToWrite > 0)
val batchToWrite =
if (offset == 0 && rowsToWrite == numRows) {
columnBatch
} else {
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
}
try {
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
} finally {
if (batchToWrite ne columnBatch) {
batchToWrite.close()
}
}
offset += rowsToWrite
}
}

private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
beforeWrite(blockStripe.getHeadingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
try {
assert(currentColumnBatch.numRows() > 0)
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
} finally {
currentColumnBatch.close()
}
}

private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(
terminalRow.batch(),
partitionColIndice,
isBucketed,
reservePartitionColumns)
try {
val iter = blockStripes.iterator()
while (iter.hasNext) {
writePartitionStripe(terminalRow, iter.next())
}
} finally {
blockStripes.release()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
private val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
private val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
private val dataColIndices = dataCols.indices.toArray
private val veloxAggTask: ColumnarBatchOutIterator = {
val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
val aggOp = SortAggregateExec(
Expand Down Expand Up @@ -261,6 +262,16 @@ object GlutenDeltaJobStatsTracker extends Logging {
case t: TerminalRow =>
val valueBatch = t.batch()
val numRows = valueBatch.numRows()
val statsValueBatch = if (valueBatch.numCols() == dataCols.size) {
valueBatch
} else {
assert(
valueBatch.numCols() > dataCols.size,
s"Delta stats input has ${valueBatch.numCols()} columns, " +
s"but the stats schema needs ${dataCols.size} columns."
)
ColumnarBatches.select(BackendsApiManager.getBackendName, valueBatch, dataColIndices)
}
val dummyKeyVec = ArrowWritableColumnVector
.allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, IntegerType))
.head
Expand All @@ -269,10 +280,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, valueBatch)
dummyKeyBatch.close()
valueBatch.close()
inputBatchQueue.put(Some(compositeBatch))
try {
val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, statsValueBatch)
inputBatchQueue.put(Some(compositeBatch))
} finally {
dummyKeyBatch.close()
if (statsValueBatch ne valueBatch) {
statsValueBatch.close()
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.files

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
Expand Down Expand Up @@ -46,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.FileFormatWriter._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -557,6 +559,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
}
}.toArray

private val reservePartitionColumns: Boolean =
description.partitionColumns.exists {
pcol =>
description.dataColumns.exists {
dcol => dcol.name == pcol.name && dcol.exprId == pcol.exprId
}
}

private def beforeWrite(record: InternalRow): Unit = {
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None
val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
Expand All @@ -583,42 +593,88 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
record match {
case carrierRow: BatchCarrierRow =>
carrierRow match {
case placeholderRow: PlaceholderRow =>
case _: PlaceholderRow =>
// Do nothing.
case terminalRow: TerminalRow =>
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(terminalRow.batch(), partitionColIndice,
isBucketed)
val iter = blockStripes.iterator()
while (iter.hasNext) {
val blockStripe = iter.next()
val headingRow = blockStripe.getHeadingRow
beforeWrite(headingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
val numRowsOfCurrentColumnarBatch = currentColumnBatch.numRows()
assert(numRowsOfCurrentColumnarBatch > 0)
val currentTerminalRow = terminalRow.withNewBatch(currentColumnBatch)
currentWriter.write(currentTerminalRow)
statsTrackers.foreach {
tracker =>
tracker.newRow(currentWriter.path, currentTerminalRow)
for (_ <- 0 until numRowsOfCurrentColumnarBatch - 1) {
tracker.newRow(currentWriter.path, new PlaceholderRow())
}
}
currentColumnBatch.close()
}
blockStripes.release()
recordsInFile += numRows
}
writePartitionedBatch(terminalRow)
}
case _ =>
beforeWrite(record)
writeRecord(record)
}
}

private def writeCurrentBatch(terminalRow: TerminalRow, rowCount: Int): Unit = {
assert(rowCount > 0)
currentWriter.write(terminalRow)
statsTrackers.foreach(_.newRow(currentWriter.path, terminalRow))
recordsInFile += rowCount
}

private def writeCurrentBatchWithMaxRecords(
terminalRow: TerminalRow,
columnBatch: ColumnarBatch): Unit = {
val numRows = columnBatch.numRows()
var offset = 0
while (offset < numRows) {
val rowsRemaining = numRows - offset
val rowsToWrite = if (description.maxRecordsPerFile > 0) {
if (recordsInFile >= description.maxRecordsPerFile) {
renewCurrentWriterIfTooManyRecords(currentPartitionValues, currentBucketId)
}
math.min(rowsRemaining.toLong, description.maxRecordsPerFile - recordsInFile).toInt
} else {
rowsRemaining
}

assert(rowsToWrite > 0)
val batchToWrite =
if (offset == 0 && rowsToWrite == numRows) {
columnBatch
} else {
VeloxColumnarBatches.slice(columnBatch, offset, rowsToWrite)
}
try {
writeCurrentBatch(terminalRow.withNewBatch(batchToWrite), rowsToWrite)
} finally {
if (batchToWrite ne columnBatch) {
batchToWrite.close()
}
}
offset += rowsToWrite
}
}

private def writePartitionStripe(terminalRow: TerminalRow, blockStripe: BlockStripe): Unit = {
beforeWrite(blockStripe.getHeadingRow)
val currentColumnBatch = blockStripe.getColumnarBatch
try {
assert(currentColumnBatch.numRows() > 0)
writeCurrentBatchWithMaxRecords(terminalRow, currentColumnBatch)
} finally {
currentColumnBatch.close()
}
}

private def writePartitionedBatch(terminalRow: TerminalRow): Unit = {
val numRows = terminalRow.batch().numRows()
if (numRows > 0) {
val blockStripes = GlutenFormatFactory.rowSplitter
.splitBlockByPartitionAndBucket(
terminalRow.batch(),
partitionColIndice,
isBucketed,
reservePartitionColumns)
try {
val iter = blockStripes.iterator()
while (iter.hasNext) {
writePartitionStripe(terminalRow, iter.next())
}
} finally {
blockStripes.release()
}
}
}
}
}
// spotless:on
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
private val statsAttrs = aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
private val statsResultAttrs = aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
private val dataColIndices = dataCols.indices.toArray
private val veloxAggTask: ColumnarBatchOutIterator = {
val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
val aggOp = SortAggregateExec(
Expand Down Expand Up @@ -265,6 +266,16 @@ object GlutenDeltaJobStatsTracker extends Logging {
case t: TerminalRow =>
val valueBatch = t.batch()
val numRows = valueBatch.numRows()
val statsValueBatch = if (valueBatch.numCols() == dataCols.size) {
valueBatch
} else {
assert(
valueBatch.numCols() > dataCols.size,
s"Delta stats input has ${valueBatch.numCols()} columns, " +
s"but the stats schema needs ${dataCols.size} columns."
)
ColumnarBatches.select(BackendsApiManager.getBackendName, valueBatch, dataColIndices)
}
val dummyKeyVec = ArrowWritableColumnVector
.allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, IntegerType))
.head
Expand All @@ -273,10 +284,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, valueBatch)
dummyKeyBatch.close()
valueBatch.close()
inputBatchQueue.put(Some(compositeBatch))
try {
val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, statsValueBatch)
inputBatchQueue.put(Some(compositeBatch))
} finally {
dummyKeyBatch.close()
if (statsValueBatch ne valueBatch) {
statsValueBatch.close()
}
}
}
}

Expand Down
Loading
Loading