diff --git a/backends-velox/src-delta24/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala b/backends-velox/src-delta24/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala new file mode 100644 index 000000000000..f5bb2479f18d --- /dev/null +++ b/backends-velox/src-delta24/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object VeloxDeltaMetadataUtils { + val DeltaDvCardinality = "delta_dv_cardinality" + val DeltaDvPayloadIndex = "delta_dv_payload_index" + + private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded" + private val RowIndexFilterType = "row_index_filter_type" + private val RowIndexFilterTypeIfContained = "IF_CONTAINED" + + final class NormalizedSplitMetadata( + val otherMetadataColumns: JList[JMap[String, Object]], + val deletionVectorPayloads: Array[Array[Byte]]) + extends Serializable + + private def decodeDescriptor( + normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = { + Option(normalizedMetadata.get(RowIndexFilterIdEncoded)) + .map(_.toString) + .filter(_.nonEmpty) + .flatMap(parseDescriptor) + } + + private def parseDescriptor(encodedDescriptor: String): Option[DeletionVectorDescriptor] = { + val methods = Seq("deserializeFromBase64", "fromJson") + methods.iterator + .map { + methodName => + Try { + val method = DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String]) + method + .invoke(DeletionVectorDescriptor, encodedDescriptor) + .asInstanceOf[DeletionVectorDescriptor] + }.toOption + } + .collectFirst { case Some(descriptor) => descriptor } + } + + private def serializePayload( + dvStore: HadoopFileSystemDVStore, + tablePath: Path, + descriptor: DeletionVectorDescriptor): Option[Array[Byte]] = { + if (tablePath == null) { + return None + } + Some( + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)) + } + + private def normalizeMetadataWithDescriptor( + metadata: JMap[String, Object], + descriptor: DeletionVectorDescriptor): JMap[String, Object] = { + val normalized = new JHashMap[String, Object]() + if (metadata != null) { + normalized.putAll(metadata) + } + normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality)) + normalized.remove(RowIndexFilterIdEncoded) + if (!normalized.containsKey(RowIndexFilterType)) { + normalized.put(RowIndexFilterType, RowIndexFilterTypeIfContained) + } + normalized + } + + def normalizeSplitMetadata( + partitionColumnCount: Int, + files: JList[PartitionedFile]): NormalizedSplitMetadata = { + val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf()) + val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size()) + val deletionVectorPayloads = scala.collection.mutable.ArrayBuffer.empty[Array[Byte]] + + files.asScala.foreach { + file => + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + val metadataWithDecodedPayload = new JHashMap[String, Object]() + if (otherMetadata != null) { + metadataWithDecodedPayload.putAll(otherMetadata) + } + + val descriptor = decodeDescriptor(metadataWithDecodedPayload) + + descriptor match { + case Some(descriptor) => + val normalized = normalizeMetadataWithDescriptor(metadataWithDecodedPayload, descriptor) + val payloadTablePath = resolveTablePath(partitionColumnCount, file) + val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor) + serializedPayload.foreach { + payload => + normalized.put(DeltaDvPayloadIndex, Int.box(deletionVectorPayloads.length)) + deletionVectorPayloads += payload + } + normalizedMetadataColumns.add(normalized) + case None => + normalizedMetadataColumns.add(metadataWithDecodedPayload) + } + } + + new NormalizedSplitMetadata(normalizedMetadataColumns, deletionVectorPayloads.toArray) + } + + private def activeSpark: SparkSession = { + SparkSession.getActiveSession + .orElse(SparkSession.getDefaultSession) + .getOrElse { + throw new IllegalStateException( + "Active SparkSession is required to materialize Delta deletion vectors") + } + } + + private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + val spark = activeSpark + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + // Spark can report a partition column count that does not map 1:1 to path depth for + // prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log. + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala new file mode 100644 index 000000000000..27279fc64ca9 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object VeloxDeltaMetadataUtils { + val DeltaDvCardinality = "delta_dv_cardinality" + val DeltaDvPayloadIndex = "delta_dv_payload_index" + + private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded" + private val RowIndexFilterType = "row_index_filter_type" + private val RowIndexFilterTypeIfContained = "IF_CONTAINED" + + final class NormalizedSplitMetadata( + val otherMetadataColumns: JList[JMap[String, Object]], + val deletionVectorPayloads: Array[Array[Byte]]) + extends Serializable + + private def decodeDescriptor( + normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = { + Option(normalizedMetadata.get(RowIndexFilterIdEncoded)) + .map(_.toString) + .filter(_.nonEmpty) + .flatMap(encoded => Try(DeletionVectorDescriptor.deserializeFromBase64(encoded)).toOption) + } + + private def serializePayload( + dvStore: HadoopFileSystemDVStore, + tablePath: Path, + descriptor: DeletionVectorDescriptor): Option[Array[Byte]] = { + if (tablePath == null) { + return None + } + Some( + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)) + } + + private def normalizeMetadataWithDescriptor( + metadata: JMap[String, Object], + descriptor: DeletionVectorDescriptor): JMap[String, Object] = { + val normalized = new JHashMap[String, Object]() + if (metadata != null) { + normalized.putAll(metadata) + } + normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality)) + normalized.remove(RowIndexFilterIdEncoded) + if (!normalized.containsKey(RowIndexFilterType)) { + normalized.put(RowIndexFilterType, RowIndexFilterTypeIfContained) + } + normalized + } + + def normalizeSplitMetadata( + partitionColumnCount: Int, + files: JList[PartitionedFile]): NormalizedSplitMetadata = { + val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf()) + val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size()) + val deletionVectorPayloads = scala.collection.mutable.ArrayBuffer.empty[Array[Byte]] + + files.asScala.foreach { + file => + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + val metadataWithDecodedPayload = new JHashMap[String, Object]() + if (otherMetadata != null) { + metadataWithDecodedPayload.putAll(otherMetadata) + } + + val descriptor = decodeDescriptor(metadataWithDecodedPayload) + + descriptor match { + case Some(descriptor) => + val normalized = normalizeMetadataWithDescriptor(metadataWithDecodedPayload, descriptor) + val payloadTablePath = resolveTablePath(partitionColumnCount, file) + val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor) + serializedPayload.foreach { + payload => + normalized.put(DeltaDvPayloadIndex, Int.box(deletionVectorPayloads.length)) + deletionVectorPayloads += payload + } + normalizedMetadataColumns.add(normalized) + case None => + normalizedMetadataColumns.add(metadataWithDecodedPayload) + } + } + + new NormalizedSplitMetadata(normalizedMetadataColumns, deletionVectorPayloads.toArray) + } + + private def activeSpark: SparkSession = { + SparkSession.getActiveSession + .orElse(SparkSession.getDefaultSession) + .getOrElse { + throw new IllegalStateException( + "Active SparkSession is required to materialize Delta deletion vectors") + } + } + + private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + val spark = activeSpark + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + // Spark can report a partition column count that does not map 1:1 to path depth for + // prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log. + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala new file mode 100644 index 000000000000..4c353c4a5758 --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.delta.DeltaParquetFileFormat._ +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Rewrites Delta scans over DV-enabled tables to request the backend-specific skip-row metadata + * column only when the snapshot actually contains DVs. + */ +trait PreprocessTableWithDVs extends SubqueryTransformerHelper { + def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = { + transformWithSubqueries(plan) { case ScanWithDeletionVectors(dvScan) => dvScan } + } +} + +object ScanWithDeletionVectors { + def unapply(a: LogicalRelation): Option[LogicalPlan] = a match { + case scan @ LogicalRelation( + relation @ HadoopFsRelation( + index: TahoeFileIndex, + _, + _, + _, + format: DeltaParquetFileFormat, + _), + _, + _, + _) => + dvEnabledScanFor(scan, relation, format, index) + case scan @ LogicalRelation( + relation @ HadoopFsRelation( + index: TahoeFileIndex, + _, + _, + _, + format: GlutenDeltaParquetFileFormat, + _), + _, + _, + _) => + dvEnabledScanFor(scan, relation, format, index) + case _ => None + } + + def dvEnabledScanFor( + scan: LogicalRelation, + hadoopRelation: HadoopFsRelation, + fileFormat: DeltaParquetFileFormat, + index: TahoeFileIndex): Option[LogicalPlan] = { + if (!deletionVectorsReadable(index.protocol, index.metadata)) { + return None + } + + require( + !index.isInstanceOf[TahoeLogFileIndex], + "Cannot work with a non-pinned table snapshot of the TahoeFileIndex") + + if (fileFormat.hasTablePath) { + return None + } + + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) { + return None + } + + val planOutput = scan.output + val spark = SparkSession.getActiveSession.get + val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation) + val rowIndexFilter = createRowIndexFilterNode(newScan) + Some(Project(planOutput, rowIndexFilter)) + } + + def dvEnabledScanFor( + scan: LogicalRelation, + hadoopRelation: HadoopFsRelation, + fileFormat: GlutenDeltaParquetFileFormat, + index: TahoeFileIndex): Option[LogicalPlan] = { + if (!deletionVectorsReadable(index.protocol, index.metadata)) { + return None + } + + require( + !index.isInstanceOf[TahoeLogFileIndex], + "Cannot work with a non-pinned table snapshot of the TahoeFileIndex") + + if (fileFormat.hasTablePath) { + return None + } + + val filesWithDVs = index + .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) + .filter(_.deletionVector != null) + if (filesWithDVs.isEmpty) { + return None + } + + val planOutput = scan.output + val spark = SparkSession.getActiveSession.get + val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation) + val rowIndexFilter = createRowIndexFilterNode(newScan) + Some(Project(planOutput, rowIndexFilter)) + } + + private def addRowIndexIfMissing(attribute: AttributeReference): AttributeReference = { + require(attribute.name == METADATA_NAME) + + val dataType = attribute.dataType.asInstanceOf[StructType] + if (dataType.fieldNames.contains(ParquetFileFormat.ROW_INDEX)) { + return attribute + } + + val newDatatype = dataType.add(ParquetFileFormat.ROW_INDEX_FIELD) + attribute.copy(dataType = newDatatype)( + exprId = attribute.exprId, + qualifier = attribute.qualifier) + } + + private def createScanWithSkipRowColumn( + spark: SparkSession, + inputScan: LogicalRelation, + fileFormat: DeltaParquetFileFormat, + tahoeFileIndex: TahoeFileIndex, + hadoopFsRelation: HadoopFsRelation): LogicalRelation = { + val useMetadataRowIndex = + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX) + + val skipRowField = IS_ROW_DELETED_STRUCT_FIELD + val scanOutputWithMetadata = if (useMetadataRowIndex) { + if (inputScan.output.map(_.name).contains(METADATA_NAME)) { + inputScan.output.collect { + case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a) + case o => o + } + } else { + inputScan.output :+ fileFormat.createFileMetadataCol() + } + } else { + inputScan.output + } + + val newScanOutput = + scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) + val newFileFormat = fileFormat.copyWithDVInfo( + tablePath = tahoeFileIndex.path.toString, + optimizationsEnabled = useMetadataRowIndex) + + val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)( + hadoopFsRelation.sparkSession) + + inputScan.copy(relation = newRelation, output = newScanOutput) + } + + private def createScanWithSkipRowColumn( + spark: SparkSession, + inputScan: LogicalRelation, + fileFormat: GlutenDeltaParquetFileFormat, + tahoeFileIndex: TahoeFileIndex, + hadoopFsRelation: HadoopFsRelation): LogicalRelation = { + val useMetadataRowIndex = + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX) + + val skipRowField = GlutenDeltaParquetFileFormat.IS_ROW_DELETED_STRUCT_FIELD + val scanOutputWithMetadata = if (useMetadataRowIndex) { + if (inputScan.output.map(_.name).contains(METADATA_NAME)) { + inputScan.output.collect { + case a: AttributeReference if a.name == METADATA_NAME => addRowIndexIfMissing(a) + case o => o + } + } else { + inputScan.output :+ fileFormat.createFileMetadataCol() + } + } else { + inputScan.output + } + + val newScanOutput = + scanOutputWithMetadata :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) + val newFileFormat = fileFormat.copyWithDVInfo( + tablePath = tahoeFileIndex.path.toString, + optimizationsEnabled = useMetadataRowIndex) + + val newRelation = hadoopFsRelation.copy(fileFormat = newFileFormat, dataSchema = newDataSchema)( + hadoopFsRelation.sparkSession) + + inputScan.copy(relation = newRelation, output = newScanOutput) + } + + private def createRowIndexFilterNode(newScan: LogicalRelation): Filter = { + val skipRowColumnRefs = newScan.output.filter(_.name == IS_ROW_DELETED_COLUMN_NAME) + require( + skipRowColumnRefs.size == 1, + s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME") + val skipRowColumnRef = skipRowColumnRefs.head + Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan) + } +} diff --git a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala new file mode 100644 index 000000000000..37a46147432e --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.delta.{DeltaTable, OptimisticTransaction, PreprocessTableWithDVs} +import org.apache.spark.sql.delta.sources.DeltaSQLConf + +/** Shadow Delta's PrepareDeltaScan to inject backend-specific DV preprocessing. */ +class PrepareDeltaScan(protected val spark: SparkSession) + extends Rule[LogicalPlan] + with PrepareDeltaScanBase + with PreprocessTableWithDVs { + + override def apply(plan0: LogicalPlan): LogicalPlan = { + var plan = plan0 + + val isSubquery = isSubqueryRoot(plan) + val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] + if (isSubquery || isDataSourceV2) { + return plan + } + + val updatedPlan = if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)) { + if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED)) { + plan = optimizeQueryWithMetadata(plan) + } + prepareDeltaScan(plan) + } else { + OptimisticTransaction.getActive.foreach { + txn => + val logsInPlan = plan.collect { case DeltaTable(fileIndex) => fileIndex.deltaLog } + if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) { + txn.readWholeTable() + } + } + plan + } + + preprocessTablesWithDVs(updatedPlan) + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala new file mode 100644 index 000000000000..9a73b97d5730 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils.{DeltaDvCardinality, DeltaDvPayloadIndex} + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import org.apache.hadoop.fs.Path + +import scala.collection.JavaConverters._ + +@ExtendedSQLTest +class DeltaDeletionVectorHandoffSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils + with DeltaSQLCommandTest { + + import testImplicits._ + + test("Spark 3.5 Delta DV handoff should materialize serialized payloads from scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val basePartitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val partitionedFile = basePartitionedFile.copy( + otherConstantMetadataColumnValues = Map[String, Object]( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED" + )) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deletionVectorPayloads.length == 1) + assert(normalized.deletionVectorPayloads.head.nonEmpty) + assert(metadata.get(DeltaDvPayloadIndex) == Int.box(0)) + assert(metadata.get(DeltaDvCardinality) == Long.box(dataFile.deletionVector.cardinality)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + } + } + + test("Spark 3.5 Delta DV handoff should skip payload materialization without scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deletionVectorPayloads.isEmpty) + assert(!metadata.containsKey(DeltaDvPayloadIndex)) + assert(!metadata.containsKey(DeltaDvCardinality)) + } + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index f265168ddbd6..fa475cd294be 100644 --- a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -197,6 +197,34 @@ class DeltaSuite checkAnswer(data.toDF(), Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil) } + test("fallback DV scan when metadata row index is disabled") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + + withSQLConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key -> "false") { + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + assert(log.update().allFiles.collect().exists(_.deletionVector != null)) + + val df = spark.read.format("delta").load(path) + val executedPlan = df.queryExecution.executedPlan + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.isEmpty) + checkAnswer(df, Seq(Row(1, "a"), Row(2, "b"))) + } + } + } + test("partitioned append - nulls") { val tempDir = Utils.createTempDir() Seq(Some(1), None) diff --git a/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala b/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala new file mode 100644 index 000000000000..f5bb2479f18d --- /dev/null +++ b/backends-velox/src-delta40/main/scala/org/apache/gluten/backendsapi/velox/VeloxDeltaMetadataUtils.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.backendsapi.velox.VeloxIteratorApi.unescapePathName +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object VeloxDeltaMetadataUtils { + val DeltaDvCardinality = "delta_dv_cardinality" + val DeltaDvPayloadIndex = "delta_dv_payload_index" + + private val RowIndexFilterIdEncoded = "row_index_filter_id_encoded" + private val RowIndexFilterType = "row_index_filter_type" + private val RowIndexFilterTypeIfContained = "IF_CONTAINED" + + final class NormalizedSplitMetadata( + val otherMetadataColumns: JList[JMap[String, Object]], + val deletionVectorPayloads: Array[Array[Byte]]) + extends Serializable + + private def decodeDescriptor( + normalizedMetadata: JMap[String, Object]): Option[DeletionVectorDescriptor] = { + Option(normalizedMetadata.get(RowIndexFilterIdEncoded)) + .map(_.toString) + .filter(_.nonEmpty) + .flatMap(parseDescriptor) + } + + private def parseDescriptor(encodedDescriptor: String): Option[DeletionVectorDescriptor] = { + val methods = Seq("deserializeFromBase64", "fromJson") + methods.iterator + .map { + methodName => + Try { + val method = DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String]) + method + .invoke(DeletionVectorDescriptor, encodedDescriptor) + .asInstanceOf[DeletionVectorDescriptor] + }.toOption + } + .collectFirst { case Some(descriptor) => descriptor } + } + + private def serializePayload( + dvStore: HadoopFileSystemDVStore, + tablePath: Path, + descriptor: DeletionVectorDescriptor): Option[Array[Byte]] = { + if (tablePath == null) { + return None + } + Some( + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable)) + } + + private def normalizeMetadataWithDescriptor( + metadata: JMap[String, Object], + descriptor: DeletionVectorDescriptor): JMap[String, Object] = { + val normalized = new JHashMap[String, Object]() + if (metadata != null) { + normalized.putAll(metadata) + } + normalized.put(DeltaDvCardinality, Long.box(descriptor.cardinality)) + normalized.remove(RowIndexFilterIdEncoded) + if (!normalized.containsKey(RowIndexFilterType)) { + normalized.put(RowIndexFilterType, RowIndexFilterTypeIfContained) + } + normalized + } + + def normalizeSplitMetadata( + partitionColumnCount: Int, + files: JList[PartitionedFile]): NormalizedSplitMetadata = { + val dvStore = new HadoopFileSystemDVStore(activeSpark.sessionState.newHadoopConf()) + val normalizedMetadataColumns = new JArrayList[JMap[String, Object]](files.size()) + val deletionVectorPayloads = scala.collection.mutable.ArrayBuffer.empty[Array[Byte]] + + files.asScala.foreach { + file => + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + val metadataWithDecodedPayload = new JHashMap[String, Object]() + if (otherMetadata != null) { + metadataWithDecodedPayload.putAll(otherMetadata) + } + + val descriptor = decodeDescriptor(metadataWithDecodedPayload) + + descriptor match { + case Some(descriptor) => + val normalized = normalizeMetadataWithDescriptor(metadataWithDecodedPayload, descriptor) + val payloadTablePath = resolveTablePath(partitionColumnCount, file) + val serializedPayload = serializePayload(dvStore, payloadTablePath, descriptor) + serializedPayload.foreach { + payload => + normalized.put(DeltaDvPayloadIndex, Int.box(deletionVectorPayloads.length)) + deletionVectorPayloads += payload + } + normalizedMetadataColumns.add(normalized) + case None => + normalizedMetadataColumns.add(metadataWithDecodedPayload) + } + } + + new NormalizedSplitMetadata(normalizedMetadataColumns, deletionVectorPayloads.toArray) + } + + private def activeSpark: SparkSession = { + SparkSession.getActiveSession + .orElse(SparkSession.getDefaultSession) + .getOrElse { + throw new IllegalStateException( + "Active SparkSession is required to materialize Delta deletion vectors") + } + } + + private def resolveTablePath(partitionColumnCount: Int, file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + val spark = activeSpark + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + // Spark can report a partition column count that does not map 1:1 to path depth for + // prepared Delta scans. Find the nearest ancestor of the file path that has _delta_log. + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } +} diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala new file mode 100644 index 000000000000..e626c00aa3a7 --- /dev/null +++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaDeletionVectorHandoffSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils +import org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils.{DeltaDvCardinality, DeltaDvPayloadIndex} + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import org.apache.hadoop.fs.Path + +import scala.collection.JavaConverters._ + +@ExtendedSQLTest +class DeltaDeletionVectorHandoffSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils + with DeltaSQLCommandTest { + + import testImplicits._ + + test("Spark 4 Delta DV handoff should materialize serialized payloads from scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val basePartitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val partitionedFile = basePartitionedFile.copy( + otherConstantMetadataColumnValues = Map[String, Object]( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED" + )) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deletionVectorPayloads.length == 1) + assert(normalized.deletionVectorPayloads.head.nonEmpty) + assert(metadata.get(DeltaDvPayloadIndex) == Int.box(0)) + assert(metadata.get(DeltaDvCardinality) == Long.box(dataFile.deletionVector.cardinality)) + assert(!metadata.containsKey(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED)) + + val df = spark.read.format("delta").load(path) + checkAnswer(df, Seq((1, "a"), (2, "b")).toDF()) + } + } + + test("Spark 4 Delta DV handoff should skip payload materialization without scan metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val log = DeltaLog.forTable(spark, new Path(path)) + val addFileWithDv = log.update().allFiles.collect().find(_.deletionVector != null) + assert(addFileWithDv.nonEmpty) + + val dataFile = addFileWithDv.get + val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(path, dataFile.path)), + start = 0L, + length = dataFile.size, + fileSize = dataFile.size) + val normalized = VeloxDeltaMetadataUtils.normalizeSplitMetadata( + partitionColumnCount = 0, + files = Seq(partitionedFile).asJava) + val metadata = normalized.otherMetadataColumns.get(0) + + assert(normalized.deletionVectorPayloads.isEmpty) + assert(!metadata.containsKey(DeltaDvPayloadIndex)) + assert(!metadata.containsKey(DeltaDvCardinality)) + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 8a800232443c..dd8eed9bc860 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -26,6 +26,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo} import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.gluten.utils.DeltaDeletionVectorRegistry import org.apache.gluten.vectorized._ import org.apache.spark.{Partition, SparkConf, TaskContext} @@ -40,15 +41,21 @@ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SparkDirectoryUtil +import org.apache.hadoop.fs.Path + import java.lang.{Long => JLong} +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.time.ZoneOffset -import java.util.UUID +import java.util.{ArrayList => JArrayList, HashMap => JHashMap, UUID} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.Try class VeloxIteratorApi extends IteratorApi with Logging { + private val deltaMetadataUtilsClassName = + "org.apache.gluten.backendsapi.velox.VeloxDeltaMetadataUtils$" private def setFileSchemaForLocalFiles( localFilesNode: LocalFilesNode, @@ -94,10 +101,18 @@ class VeloxIteratorApi extends IteratorApi with Logging { val metadataColumns = partitionFiles .map( f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava) - val otherMetadataColumns = partitionFiles - .map(f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f)) + val (otherMetadataColumns, deletionVectorPayloads) = + normalizeRegisteredDeltaSplitMetadata(partitionFiles, properties) + .orElse(normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles)) + .getOrElse { + ( + partitionFiles.map { + f => SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(f) + }, + Array.empty[Array[Byte]]) + } - setFileSchemaForLocalFiles( + val localFiles = setFileSchemaForLocalFiles( LocalFilesBuilder.makeLocalFiles( partitionIndex, paths.asJava, @@ -115,6 +130,12 @@ class VeloxIteratorApi extends IteratorApi with Logging { dataSchema, fileFormat ) + + if (deletionVectorPayloads.nonEmpty) { + VeloxSplitInfoWithPayloads(localFiles, deletionVectorPayloads) + } else { + localFiles + } } /** Generate native row partition. */ @@ -179,6 +200,120 @@ class VeloxIteratorApi extends IteratorApi with Logging { NativePlanEvaluator.injectWriteFilesTempPath(path, fileName) } + private def buildSplitPayloadBuffers(splitInfos: Array[SplitInfo]): Array[Array[ByteBuffer]] = { + val payloadBuffers = splitInfos.map { + case splitInfoWithPayloads: VeloxSplitInfoWithPayloads + if splitInfoWithPayloads.deletionVectorPayloads.nonEmpty => + splitInfoWithPayloads.deletionVectorPayloads.map(toDirectByteBuffer) + case _ => + null + } + if (payloadBuffers.exists(_ != null)) payloadBuffers else null + } + + private def toDirectByteBuffer(bytes: Array[Byte]): ByteBuffer = { + val directBuffer = ByteBuffer.allocateDirect(bytes.length) + directBuffer.put(bytes) + directBuffer.flip() + directBuffer + } + + private def normalizeDeltaSplitMetadata( + partitionColumnCount: Int, + partitionFiles: Seq[PartitionedFile]) + : Option[(Seq[java.util.Map[String, Object]], Array[Array[Byte]])] = { + try { + // scalastyle:off classforname + val moduleClass = Class.forName(deltaMetadataUtilsClassName) + // scalastyle:on classforname + val module = moduleClass.getField("MODULE$").get(null) + val normalizeMethod = + moduleClass.getMethod("normalizeSplitMetadata", classOf[Int], classOf[java.util.List[_]]) + val normalized = + normalizeMethod.invoke(module, Int.box(partitionColumnCount), partitionFiles.asJava) + val metadataMethod = normalized.getClass.getMethod("otherMetadataColumns") + val payloadsMethod = normalized.getClass.getMethod("deletionVectorPayloads") + Some( + metadataMethod + .invoke(normalized) + .asInstanceOf[java.util.List[java.util.Map[String, Object]]] + .asScala + .toSeq, + payloadsMethod.invoke(normalized).asInstanceOf[Array[Array[Byte]]] + ) + } catch { + case _: ClassNotFoundException | _: NoSuchMethodException => + None + } + } + + private def normalizeRegisteredDeltaSplitMetadata( + partitionFiles: Seq[PartitionedFile], + properties: Map[String, String]) + : Option[(Seq[java.util.Map[String, Object]], Array[Array[Byte]])] = { + properties + .get(DeltaDeletionVectorRegistry.RegistryIdProperty) + .flatMap(DeltaDeletionVectorRegistry.get) + .flatMap { + registeredEntries => + val normalizedMetadataColumns = new JArrayList[java.util.Map[String, Object]]() + val deletionVectorPayloads = mutable.ArrayBuffer.empty[Array[Byte]] + var matchedDeletionVectors = 0 + partitionFiles.foreach { + file => + val metadata = new JHashMap[String, Object]() + val baseMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + if (baseMetadata != null) { + metadata.putAll(baseMetadata) + } + lookupRegisteredDeltaDeletionVector(file, registeredEntries).foreach { + entry => + metadata.put("delta_dv_cardinality", Long.box(entry.cardinality)) + metadata.put("row_index_filter_type", entry.filterType) + metadata.put("delta_dv_payload_index", Int.box(deletionVectorPayloads.length)) + deletionVectorPayloads += entry.payload + matchedDeletionVectors += 1 + } + normalizedMetadataColumns.add(metadata) + } + if (matchedDeletionVectors == 0) { + None + } else { + Some((normalizedMetadataColumns.asScala.toSeq, deletionVectorPayloads.toArray)) + } + } + } + + private def lookupRegisteredDeltaDeletionVector( + file: PartitionedFile, + registeredEntries: Map[String, DeltaDeletionVectorRegistry.Entry]) + : Option[DeltaDeletionVectorRegistry.Entry] = { + deltaDeletionVectorPathCandidates(file).iterator + .map(registeredEntries.get) + .collectFirst { case Some(entry) => entry } + } + + private def deltaDeletionVectorPathCandidates(file: PartitionedFile): Seq[String] = { + val rawPath = unescapePathName(file.filePath.toString) + val path = new Path(rawPath) + val pathUri = partitionedFilePathUri(file) + Seq( + pathUri.map(_.toASCIIString), + pathUri.map(_.getPath), + Some(rawPath), + Some(path.toUri.toASCIIString), + Some(path.toUri.getPath), + Some(rawPath.stripPrefix("/")) + ).flatten + .map(DeltaDeletionVectorRegistry.normalizePathKey(_)) + .filter(_.nonEmpty) + .distinct + } + + private def partitionedFilePathUri(file: PartitionedFile): Option[java.net.URI] = + Try(file.getClass.getMethod("pathUri").invoke(file).asInstanceOf[java.net.URI]).toOption + /** Generate Iterator[ColumnarBatch] for first stage. */ override def genFirstStageIterator( inputPartition: BaseGlutenPartition, @@ -205,6 +340,8 @@ class VeloxIteratorApi extends IteratorApi with Logging { .splitInfos .map(splitInfo => splitInfo.toProtobuf.toByteArray) .toArray + val splitPayloadBuffers = + buildSplitPayloadBuffers(inputPartition.asInstanceOf[GlutenPartition].splitInfos) val spillDirPath = SparkDirectoryUtil .get() .namespace("gluten-spill") @@ -214,6 +351,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { transKernel.createKernelWithBatchIterator( inputPartition.plan, if (splitInfoByteArray.nonEmpty) splitInfoByteArray else null, + splitPayloadBuffers, if (columnarNativeIterators.nonEmpty) columnarNativeIterators.toArray else null, partitionIndex, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) @@ -268,6 +406,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { transKernel.createKernelWithBatchIterator( rootNode.toProtobuf.toByteArray, null, + null, if (columnarNativeIterator.nonEmpty) columnarNativeIterator.toArray else null, partitionIndex, BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSplitInfoWithPayloads.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSplitInfoWithPayloads.scala new file mode 100644 index 000000000000..c34fd89da7da --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSplitInfoWithPayloads.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.backendsapi.velox + +import org.apache.gluten.substrait.rel.SplitInfo + +import com.google.protobuf.Message + +final case class VeloxSplitInfoWithPayloads( + delegate: SplitInfo, + deletionVectorPayloads: Array[Array[Byte]]) + extends SplitInfo { + + override def preferredLocations(): java.util.List[String] = delegate.preferredLocations() + + override def toProtobuf(): Message = delegate.toProtobuf() +} diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index 4ab944898bd6..b8d6fd3e18af 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -97,6 +97,8 @@ class Runtime : public std::enable_shared_from_this { throw GlutenException("Not implemented"); } + virtual void setSplitPayloads(int32_t idx, std::vector payloads) {} + virtual std::string planString(bool details, const std::unordered_map& sessionConf) { throw GlutenException("Not implemented"); } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea84..c01d574ebc71 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include "compute/Runtime.h" #include "config/GlutenConfig.h" @@ -463,6 +464,7 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith jobject wrapper, jbyteArray planArr, jobjectArray splitInfosArr, + jobjectArray splitPayloadsArr, jobjectArray batchItrArray, jint stageId, jint partitionId, @@ -493,6 +495,30 @@ Java_org_apache_gluten_vectorized_PlanEvaluatorJniWrapper_nativeCreateKernelWith auto safeSplitArray = getByteArrayElementsSafe(env, splitInfoArray); auto splitInfoData = safeSplitArray.elems(); + if (splitPayloadsArr != nullptr) { + jobjectArray splitPayloadArray = static_cast(env->GetObjectArrayElement(splitPayloadsArr, i)); + if (splitPayloadArray != nullptr) { + std::vector splitPayloads; + splitPayloads.reserve(env->GetArrayLength(splitPayloadArray)); + for (jsize payloadIndex = 0, payloadCount = env->GetArrayLength(splitPayloadArray); + payloadIndex < payloadCount; + ++payloadIndex) { + jobject payloadBuffer = env->GetObjectArrayElement(splitPayloadArray, payloadIndex); + GLUTEN_CHECK(payloadBuffer != nullptr, "Split payload buffer must not be null"); + auto* payloadData = reinterpret_cast(env->GetDirectBufferAddress(payloadBuffer)); + const auto payloadCapacity = env->GetDirectBufferCapacity(payloadBuffer); + GLUTEN_CHECK(payloadData != nullptr, "Split payload buffer must be a direct ByteBuffer"); + GLUTEN_CHECK( + payloadCapacity >= 0 && payloadCapacity <= std::numeric_limits::max(), + "Split payload buffer capacity must fit int32_t"); + splitPayloads.push_back({payloadData, static_cast(payloadCapacity)}); + env->DeleteLocalRef(payloadBuffer); + } + ctx->setSplitPayloads(i, std::move(splitPayloads)); + env->DeleteLocalRef(splitPayloadArray); + } + } + ctx->parseSplitInfo(splitInfoData, splitInfoSize, i); } } diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index f3ffab59a6a8..fe4420469080 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -17,7 +17,10 @@ #include "VeloxPlanConverter.h" #include +#include +#include +#include #include "config/GlutenConfig.h" #include "iceberg/IcebergPlanConverter.h" #include "operators/plannodes/IteratorSplit.h" @@ -48,9 +51,41 @@ VeloxPlanConverter::VeloxPlanConverter( } namespace { +const std::string kDeltaDvPayloadIndex = "delta_dv_payload_index"; + +std::optional unpackMetadataValue(const google::protobuf::Any& value) { + google::protobuf::BytesValue bytesValue; + if (value.UnpackTo(&bytesValue)) { + return bytesValue.value(); + } + + google::protobuf::StringValue stringValue; + if (value.UnpackTo(&stringValue)) { + return stringValue.value(); + } + + google::protobuf::Int32Value int32Value; + if (value.UnpackTo(&int32Value)) { + return std::to_string(int32Value.value()); + } + + google::protobuf::Int64Value int64Value; + if (value.UnpackTo(&int64Value)) { + return std::to_string(int64Value.value()); + } + + google::protobuf::DoubleValue doubleValue; + if (value.UnpackTo(&doubleValue)) { + return std::to_string(doubleValue.value()); + } + + return std::nullopt; +} + std::shared_ptr parseScanSplitInfo( const facebook::velox::config::ConfigBase* veloxCfg, - const google::protobuf::RepeatedPtrField& fileList) { + const google::protobuf::RepeatedPtrField& fileList, + const std::vector* splitPayloads) { using SubstraitFileFormatCase = ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase; auto splitInfo = std::make_shared(); @@ -61,6 +96,7 @@ std::shared_ptr parseScanSplitInfo( splitInfo->partitionColumns.reserve(fileList.size()); splitInfo->properties.reserve(fileList.size()); splitInfo->metadataColumns.reserve(fileList.size()); + splitInfo->deletionVectorPayloads.reserve(fileList.size()); for (const auto& file : fileList) { // Expect all Partitions share the same index. splitInfo->partitionIndex = file.partition_index(); @@ -75,6 +111,25 @@ std::shared_ptr parseScanSplitInfo( for (const auto& metadataColumn : file.metadata_columns()) { metadataColumnMap[metadataColumn.key()] = metadataColumn.value(); } + for (const auto& otherMetadataColumn : file.other_const_metadata_columns()) { + if (auto unpackedValue = unpackMetadataValue(otherMetadataColumn.value())) { + metadataColumnMap[otherMetadataColumn.key()] = std::move(*unpackedValue); + } + } + if (auto payloadIndexIt = metadataColumnMap.find(kDeltaDvPayloadIndex); payloadIndexIt != metadataColumnMap.end()) { + VELOX_USER_CHECK_NOT_NULL(splitPayloads, "Split payload index found without an external payload buffer"); + const auto payloadIndex = static_cast(std::stoul(payloadIndexIt->second)); + VELOX_USER_CHECK_LT( + payloadIndex, + splitPayloads->size(), + "Split payload index {} is out of range for {} payload buffers", + payloadIndex, + splitPayloads->size()); + splitInfo->deletionVectorPayloads.emplace_back(splitPayloads->at(payloadIndex)); + metadataColumnMap.erase(payloadIndexIt); + } else { + splitInfo->deletionVectorPayloads.emplace_back(std::nullopt); + } splitInfo->metadataColumns.emplace_back(metadataColumnMap); splitInfo->paths.emplace_back(file.uri_file()); @@ -138,12 +193,16 @@ std::shared_ptr parseScanSplitInfo( void parseLocalFileNodes( SubstraitToVeloxPlanConverter* planConverter, const facebook::velox::config::ConfigBase* veloxCfg, - std::vector<::substrait::ReadRel_LocalFiles>& localFiles) { + std::vector<::substrait::ReadRel_LocalFiles>& localFiles, + const std::unordered_map>& splitPayloads) { std::vector> splitInfos; splitInfos.reserve(localFiles.size()); - for (const auto& localFile : localFiles) { + for (size_t splitIndex = 0; splitIndex < localFiles.size(); ++splitIndex) { + const auto& localFile = localFiles[splitIndex]; const auto& fileList = localFile.items(); - splitInfos.push_back(parseScanSplitInfo(veloxCfg, fileList)); + auto payloadIt = splitPayloads.find(splitIndex); + splitInfos.push_back( + parseScanSplitInfo(veloxCfg, fileList, payloadIt == splitPayloads.end() ? nullptr : &payloadIt->second)); } planConverter->setSplitInfos(std::move(splitInfos)); @@ -152,9 +211,10 @@ void parseLocalFileNodes( std::shared_ptr VeloxPlanConverter::toVeloxPlan( const ::substrait::Plan& substraitPlan, - std::vector<::substrait::ReadRel_LocalFiles> localFiles) { + std::vector<::substrait::ReadRel_LocalFiles> localFiles, + const std::unordered_map>& splitPayloads) { if (!validationMode_) { - parseLocalFileNodes(&substraitVeloxPlanConverter_, veloxCfg_, localFiles); + parseLocalFileNodes(&substraitVeloxPlanConverter_, veloxCfg_, localFiles, splitPayloads); } return substraitVeloxPlanConverter_.toVeloxPlan(substraitPlan); diff --git a/cpp/velox/compute/VeloxPlanConverter.h b/cpp/velox/compute/VeloxPlanConverter.h index 1aee2c36bd12..fa1ec0f9e04d 100644 --- a/cpp/velox/compute/VeloxPlanConverter.h +++ b/cpp/velox/compute/VeloxPlanConverter.h @@ -41,7 +41,8 @@ class VeloxPlanConverter { std::shared_ptr toVeloxPlan( const ::substrait::Plan& substraitPlan, - std::vector<::substrait::ReadRel_LocalFiles> localFiles); + std::vector<::substrait::ReadRel_LocalFiles> localFiles, + const std::unordered_map>& splitPayloads = {}); const std::unordered_map>& splitInfos() { return substraitVeloxPlanConverter_.splitInfos(); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 0fa35f5d74c0..cc3ec1463d08 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -363,6 +363,13 @@ void VeloxRuntime::parseSplitInfo(const uint8_t* data, int32_t size, int32_t spl localFiles_.push_back(localFile); } +void VeloxRuntime::setSplitPayloads(int32_t splitIndex, std::vector payloads) { + if (payloads.empty()) { + return; + } + splitPayloads_[splitIndex] = std::move(payloads); +} + void VeloxRuntime::getInfoAndIds( const std::unordered_map>& splitInfoMap, const std::unordered_set& leafPlanNodeIds, @@ -398,7 +405,7 @@ std::string VeloxRuntime::planString(bool details, const std::unordered_maptoString(details, true); } @@ -420,7 +427,7 @@ std::shared_ptr VeloxRuntime::createResultIterator( connectorIds_, *localWriteFilesTempPath(), *localWriteFileName()); - veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_)); + veloxPlan_ = veloxPlanConverter.toVeloxPlan(substraitPlan_, std::move(localFiles_), splitPayloads_); LOG_IF(INFO, debugModeEnabled_ && taskInfo_.has_value()) << "############### Velox plan for task " << taskInfo_.value() << " ###############" << std::endl << veloxPlan_->toString(true, true); diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 37f4da33439b..cfba81db92ee 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -56,6 +56,8 @@ class VeloxRuntime final : public Runtime { void parseSplitInfo(const uint8_t* data, int32_t size, int32_t splitIndex) override; + void setSplitPayloads(int32_t splitIndex, std::vector payloads) override; + VeloxMemoryManager* memoryManager() override; // FIXME This is not thread-safe? @@ -159,6 +161,7 @@ class VeloxRuntime final : public Runtime { std::unique_ptr spillExecutor_; std::unique_ptr ioExecutor_; VeloxConnectorIds connectorIds_; + std::unordered_map> splitPayloads_; std::unordered_map> emptySchemaBatchLoopUp_; }; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index ccc1917f417e..990451ec4101 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -15,9 +15,13 @@ * limitations under the License. */ #include "WholeStageResultIterator.h" +#include +#include #include "VeloxBackend.h" #include "VeloxPlanConverter.h" #include "VeloxRuntime.h" +#include "compute/delta/DeltaConnector.h" +#include "compute/delta/DeltaSplit.h" #include "config/VeloxConfig.h" #include "utils/ConfigExtractor.h" #include "velox/connectors/hive/HiveConfig.h" @@ -66,6 +70,90 @@ const std::string kWriteIOTime = "writeIOWallNanos"; // others const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; +const std::string kDeltaTableFormat = "delta"; +const std::string kTableFormatKey = "table_format"; +const std::string kDeltaDvCardinality = "delta_dv_cardinality"; +const std::string kRowIndexFilterType = "row_index_filter_type"; + +bool isDeltaMetadata(const std::unordered_map& metadata) { + auto tableFormatIt = metadata.find(kTableFormatKey); + return (tableFormatIt != metadata.end() && tableFormatIt->second == kDeltaTableFormat) || + metadata.find(kDeltaDvCardinality) != metadata.end() || metadata.find(kRowIndexFilterType) != metadata.end(); +} + +bool isDeltaScanInfo(const std::shared_ptr& splitInfo) { + for (const auto& metadata : splitInfo->metadataColumns) { + if (isDeltaMetadata(metadata)) { + return true; + } + } + return false; +} + +const velox::core::TableScanNode* findTableScanNodeById( + const std::shared_ptr& planNode, + const velox::core::PlanNodeId& nodeId) { + if (planNode == nullptr) { + return nullptr; + } + + if (planNode->id() == nodeId) { + return dynamic_cast(planNode.get()); + } + + for (const auto& source : planNode->sources()) { + if (const auto* found = findTableScanNodeById(source, nodeId)) { + return found; + } + } + return nullptr; +} + +std::string connectorIdForScanNode( + const std::shared_ptr& planNode, + const velox::core::PlanNodeId& nodeId) { + const auto* tableScanNode = findTableScanNodeById(planNode, nodeId); + if (tableScanNode == nullptr) { + return ""; + } + return tableScanNode->tableHandle()->connectorId(); +} + +std::optional getOptionalUint64( + const std::unordered_map& metadata, + const std::string& key) { + auto it = metadata.find(key); + if (it == metadata.end() || it->second.empty()) { + return std::nullopt; + } + return static_cast(std::stoull(it->second)); +} + +std::optional parseDeltaDeletionVector( + const std::unordered_map& metadata, + std::optional serializedPayloadView) { + if (!serializedPayloadView.has_value()) { + return std::nullopt; + } + + const auto cardinality = getOptionalUint64(metadata, kDeltaDvCardinality); + return gluten::delta::DeltaDeletionVectorDescriptor::serialized(cardinality, serializedPayloadView); +} + +gluten::delta::DeltaRowIndexFilterType parseDeltaRowIndexFilterType( + const std::unordered_map& metadata) { + auto it = metadata.find(kRowIndexFilterType); + if (it == metadata.end()) { + return gluten::delta::DeltaRowIndexFilterType::kKeepAll; + } + if (it->second == "IF_CONTAINED") { + return gluten::delta::DeltaRowIndexFilterType::kIfContained; + } + if (it->second == "IF_NOT_CONTAINED") { + return gluten::delta::DeltaRowIndexFilterType::kIfNotContained; + } + return gluten::delta::DeltaRowIndexFilterType::kKeepAll; +} } // namespace @@ -131,7 +219,8 @@ WholeStageResultIterator::WholeStageResultIterator( throw std::runtime_error("Invalid scan information."); } - for (const auto& scanInfo : scanInfos) { + for (size_t scanInfoIdx = 0; scanInfoIdx < scanInfos.size(); ++scanInfoIdx) { + const auto& scanInfo = scanInfos[scanInfoIdx]; // Get the information for TableScan. // Partition index in scan info is not used. const auto& paths = scanInfo->paths; @@ -141,6 +230,13 @@ WholeStageResultIterator::WholeStageResultIterator( const auto& format = scanInfo->format; const auto& partitionColumns = scanInfo->partitionColumns; const auto& metadataColumns = scanInfo->metadataColumns; + const auto scanNodeConnectorId = connectorIdForScanNode(veloxPlan_, scanNodeIds_[scanInfoIdx]); + const bool isDeltaScan = scanNodeConnectorId == connectorIds_.delta || isDeltaScanInfo(scanInfo); + const auto deltaMetadataFiles = std::count_if( + metadataColumns.begin(), metadataColumns.end(), [](const auto& metadata) { return isDeltaMetadata(metadata); }); + LOG(INFO) << "WholeStageResultIterator scanInfo[" << scanInfoIdx << "] nodeId=" << scanNodeIds_[scanInfoIdx] + << " files=" << paths.size() << " connectorId=" << scanNodeConnectorId << " isDeltaScan=" << isDeltaScan + << " deltaMetadataFiles=" << deltaMetadataFiles; #ifdef GLUTEN_ENABLE_GPU // Under the pre-condition that all the split infos has same partition column and format. const auto canUseCudfConnector = scanInfo->canUseCudfConnector(); @@ -174,10 +270,29 @@ WholeStageResultIterator::WholeStageResultIterator( deleteFiles, metadataColumn, properties[idx]); + } else if (isDeltaScan) { + std::unordered_map customSplitInfo{{"table_format", kDeltaTableFormat}}; + split = std::make_shared( + connectorIds_.delta, + paths[idx], + format, + starts[idx], + lengths[idx], + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + std::unordered_map(), + true, + parseDeltaDeletionVector(metadataColumn, scanInfo->deletionVectorPayloads[idx]), + std::nullopt, + parseDeltaRowIndexFilterType(metadataColumn), + metadataColumn, + properties[idx]); } else { - auto connectorId = connectorIds_.hive; + auto connectorId = scanNodeConnectorId.empty() ? connectorIds_.hive : scanNodeConnectorId; #ifdef GLUTEN_ENABLE_GPU - if (canUseCudfConnector && enableCudf_ && + if (connectorId == connectorIds_.hive && canUseCudfConnector && enableCudf_ && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault)) { connectorId = connectorIds_.cudfHive; } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 5477176ce85f..6aabd1076aa8 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -19,6 +19,7 @@ #include "TypeUtils.h" #include "VariantToVectorConverter.h" +#include "compute/delta/DeltaConnector.h" #include "jni/JniHashTable.h" #include "operators/hashjoin/HashTableBuilder.h" #include "operators/plannodes/RowVectorStream.h" @@ -46,6 +47,11 @@ using namespace cudf_velox::connector::hive; namespace gluten { namespace { +const std::string kDeltaTableFormat = "delta"; +const std::string kTableFormatKey = "table_format"; +const std::string kDeltaDvCardinality = "delta_dv_cardinality"; +const std::string kRowIndexFilterType = "row_index_filter_type"; + bool useCudfTableHandle(const std::vector>& splitInfos) { #ifdef GLUTEN_ENABLE_GPU if (splitInfos.empty()) { @@ -57,6 +63,21 @@ bool useCudfTableHandle(const std::vector>& splitInfo #endif } +bool isDeltaMetadata(const std::unordered_map& metadata) { + auto tableFormatIt = metadata.find(kTableFormatKey); + return (tableFormatIt != metadata.end() && tableFormatIt->second == kDeltaTableFormat) || + metadata.find(kDeltaDvCardinality) != metadata.end() || metadata.find(kRowIndexFilterType) != metadata.end(); +} + +bool isDeltaSplitInfo(const std::shared_ptr& splitInfo) { + for (const auto& metadata : splitInfo->metadataColumns) { + if (isDeltaMetadata(metadata)) { + return true; + } + } + return false; +} + core::SortOrder toSortOrder(const ::substrait::SortField& sortField) { switch (sortField.direction()) { case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST: @@ -1573,8 +1594,9 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; - auto connectorId = connectorIds_.hive; - if (useCudfTableHandle(splitInfos_) && veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && + auto connectorId = isDeltaSplitInfo(splitInfo) ? connectorIds_.delta : connectorIds_.hive; + if (connectorId == connectorIds_.hive && useCudfTableHandle(splitInfos_) && + veloxCfg_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && veloxCfg_->get(kCudfEnabled, kCudfEnabledDefault)) { #ifdef GLUTEN_ENABLE_GPU connectorId = connectorIds_.cudfHive; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 373601916d4d..f8dad3ed1585 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -19,6 +19,7 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "compute/Runtime.h" #include "compute/VeloxConnectorIds.h" #include "velox/connectors/hive/FileProperties.h" #include "velox/connectors/hive/TableHandle.h" @@ -50,6 +51,9 @@ struct SplitInfo { /// The metadata columns associated with partitioned table. std::vector> metadataColumns; + /// Optional externally provided deletion vector payloads aligned with metadataColumns. + std::vector> deletionVectorPayloads; + /// The file paths to be scanned. std::vector paths; diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java index d25137184112..3734222a08fd 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java @@ -31,11 +31,17 @@ public class ColumnarBatchOutIterator extends ClosableIterator implements RuntimeAware { private final Runtime runtime; private final long iterHandle; + private final Object retainedReference; public ColumnarBatchOutIterator(Runtime runtime, long iterHandle) { + this(runtime, iterHandle, null); + } + + public ColumnarBatchOutIterator(Runtime runtime, long iterHandle, Object retainedReference) { super(); this.runtime = runtime; this.iterHandle = iterHandle; + this.retainedReference = retainedReference; } @Override diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java index 6d2c90896b2a..d94409c4777d 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -78,17 +79,30 @@ public ColumnarBatchOutIterator createKernelWithBatchIterator( int partitionIndex, String spillDirPath) throws RuntimeException { + return createKernelWithBatchIterator( + wsPlan, splitInfo, null, iterList, partitionIndex, spillDirPath); + } + + public ColumnarBatchOutIterator createKernelWithBatchIterator( + byte[] wsPlan, + byte[][] splitInfo, + ByteBuffer[][] splitPayloads, + ColumnarBatchInIterator[] iterList, + int partitionIndex, + String spillDirPath) + throws RuntimeException { final long itrHandle = jniWrapper.nativeCreateKernelWithIterator( wsPlan, splitInfo, + splitPayloads, iterList, TaskContext.get().stageId(), partitionIndex, // TaskContext.getPartitionId(), TaskContext.get().taskAttemptId(), DebugUtil.isDumpingEnabledForTask(), spillDirPath); - final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle); + final ColumnarBatchOutIterator out = createOutIterator(runtime, itrHandle, splitPayloads); runtime .memoryManager() .addSpiller( @@ -110,7 +124,8 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { return out; } - private ColumnarBatchOutIterator createOutIterator(Runtime runtime, long itrHandle) { - return new ColumnarBatchOutIterator(runtime, itrHandle); + private ColumnarBatchOutIterator createOutIterator( + Runtime runtime, long itrHandle, Object retainedReference) { + return new ColumnarBatchOutIterator(runtime, itrHandle, retainedReference); } } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java index a80829067987..c68aab4d757b 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java @@ -20,6 +20,8 @@ import org.apache.gluten.runtime.RuntimeAware; import org.apache.gluten.validate.NativePlanValidationInfo; +import java.nio.ByteBuffer; + /** * This class is implemented in JNI. This provides the Java interface to invoke functions in JNI. * This file is used to generate the .h files required for jni. Avoid all external dependencies in @@ -72,6 +74,7 @@ public long rtHandle() { public native long nativeCreateKernelWithIterator( byte[] wsPlan, byte[][] splitInfo, + ByteBuffer[][] splitPayloads, ColumnarBatchInIterator[] batchItr, int stageId, int partitionId, diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala index 1be03dd404ad..160dbdbe7075 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala @@ -18,16 +18,27 @@ package org.apache.gluten.execution import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat +import org.apache.gluten.utils.DeltaDeletionVectorRegistry +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read.streaming.SparkDataStream +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet +import org.apache.hadoop.fs.Path + +import scala.util.control.NonFatal + case class DeltaScanTransformer( @transient override val relation: HadoopFsRelation, @transient stream: Option[SparkDataStream], @@ -55,16 +66,15 @@ case class DeltaScanTransformer( override lazy val fileFormat: ReadFileFormat = ReadFileFormat.ParquetReadFormat - override protected def doValidateInternal(): ValidationResult = { - if ( - requiredSchema.fields.exists( - _.name == "__delta_internal_is_row_deleted") || requiredSchema.fields.exists( - _.name == "__delta_internal_row_index") - ) { - return ValidationResult.failed(s"Deletion vector is not supported in native.") - } + private lazy val deltaDeletionVectorRegistryId: Option[String] = + DeltaScanTransformer.registerDeletionVectorsFromFileFormat(relation) + + override protected def doValidateInternal(): ValidationResult = super.doValidateInternal() - super.doValidateInternal() + override def getProperties: Map[String, String] = { + super.getProperties ++ deltaDeletionVectorRegistryId + .map(DeltaDeletionVectorRegistry.RegistryIdProperty -> _) + .toMap } override def doCanonicalize(): DeltaScanTransformer = { @@ -90,6 +100,176 @@ case class DeltaScanTransformer( } object DeltaScanTransformer { + private val IfContainedFilterType = "IF_CONTAINED" + + private def registerDeletionVectorsFromFileFormat(relation: HadoopFsRelation): Option[String] = { + registerDeletionVectorsFromBroadcastMap(relation) + .orElse(registerDeletionVectorsFromPreparedScan(relation)) + } + + private def registerDeletionVectorsFromBroadcastMap( + relation: HadoopFsRelation): Option[String] = { + try { + val format = relation.fileFormat + val formatClass = format.getClass + val broadcastDvMap = Option(formatClass.getMethod("broadcastDvMap").invoke(format)) + .collect { case o: Option[_] => o } + .flatten + .collect { case b: Broadcast[_] => b.value } + .collect { case m: scala.collection.Map[_, _] => m } + .getOrElse(Map.empty) + .collect { case (uri: java.net.URI, value) => uri -> value } + if (broadcastDvMap.isEmpty) { + return None + } + + val tablePath = + Option(formatClass.getMethod("tablePath").invoke(format)) + .collect { case o: Option[_] => o } + .flatten + .map(_.toString) + .orElse(relation.location.rootPaths.headOption.map(_.toString)) + .map(new Path(_)) + .orNull + if (tablePath == null) { + return None + } + + val dvStore = new HadoopFileSystemDVStore(relation.sparkSession.sessionState.newHadoopConf()) + val registeredEntries = broadcastDvMap.iterator.flatMap { + case (uri, dvDescriptorWithFilterType) => + try { + val descriptor = dvDescriptorWithFilterType.getClass + .getMethod("descriptor") + .invoke(dvDescriptorWithFilterType) + .asInstanceOf[DeletionVectorDescriptor] + val filterType = dvDescriptorWithFilterType.getClass + .getMethod("filterType") + .invoke(dvDescriptorWithFilterType) + .toString + val payload = StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + pathAliases(uri).map { + _ -> DeltaDeletionVectorRegistry.Entry(descriptor.cardinality, filterType, payload) + } + } catch { + case NonFatal(_) => Seq.empty + } + }.toMap + + if (registeredEntries.isEmpty) { + None + } else { + Some(DeltaDeletionVectorRegistry.register(registeredEntries)) + } + } catch { + case _: NoSuchMethodException => None + case NonFatal(_) => None + } + } + + private def registerDeletionVectorsFromPreparedScan( + relation: HadoopFsRelation): Option[String] = { + relation.location match { + case preparedIndex: PreparedDeltaFileIndex => + val tablePath = + Option(preparedIndex.path) + .orElse(relation.location.rootPaths.headOption) + .orNull + if (tablePath == null) { + return None + } + + val dvStore = + new HadoopFileSystemDVStore(relation.sparkSession.sessionState.newHadoopConf()) + val preparedFiles = preparedIndex.preparedScan.files + registerDeletionVectorsFromAddFiles(preparedFiles.iterator, tablePath, dvStore) + case _ => + None + } + } + + private def registerDeletionVectorsFromAddFiles( + files: Iterator[AddFile], + tablePath: Path, + dvStore: HadoopFileSystemDVStore): Option[String] = { + val registeredEntries = files.flatMap { + addFile => + Option(addFile.deletionVector).iterator.flatMap { + descriptor => + try { + val payload = StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + val absolutePath = new Path(tablePath, addFile.path) + pathAliases(absolutePath.toUri, absolutePath.toString).map { + _ -> DeltaDeletionVectorRegistry.Entry( + descriptor.cardinality, + IfContainedFilterType, + payload) + } + } catch { + case NonFatal(_) => Seq.empty + } + } + }.toMap + + if (registeredEntries.isEmpty) { + None + } else { + Some(DeltaDeletionVectorRegistry.register(registeredEntries)) + } + } + + private def pathAliases(uri: java.net.URI, extraAliases: String*): Seq[String] = { + val decodedExtraAliases = extraAliases.map(percentUnescapePathName) + (Seq(uri.toASCIIString, uri.getPath, Option(uri.getPath).map(_.stripPrefix("/")).orNull) ++ + extraAliases ++ + decodedExtraAliases ++ + extraAliases.map(_.stripPrefix("/")) ++ + decodedExtraAliases.map(_.stripPrefix("/"))) + .filter(_ != null) + .map(DeltaDeletionVectorRegistry.normalizePathKey) + .filter(_.nonEmpty) + .distinct + } + + private def percentUnescapePathName(path: String): String = { + if (path == null || path.isEmpty) { + return path + } + var plaintextEndIdx = path.indexOf('%') + val length = path.length + if (plaintextEndIdx == -1 || plaintextEndIdx + 2 >= length) { + path + } else { + val sb = new java.lang.StringBuilder(length) + var plaintextStartIdx = 0 + while (plaintextEndIdx != -1 && plaintextEndIdx + 2 < length) { + if (plaintextEndIdx > plaintextStartIdx) sb.append(path, plaintextStartIdx, plaintextEndIdx) + if ( + java.lang.Character.digit(path.charAt(plaintextEndIdx + 1), 16) != -1 && + java.lang.Character.digit(path.charAt(plaintextEndIdx + 2), 16) != -1 + ) { + sb.append( + ((java.lang.Character.digit(path.charAt(plaintextEndIdx + 1), 16) << 4) | + java.lang.Character.digit(path.charAt(plaintextEndIdx + 2), 16)).toChar) + plaintextStartIdx = plaintextEndIdx + 3 + } else { + sb.append('%') + plaintextStartIdx = plaintextEndIdx + 1 + } + plaintextEndIdx = path.indexOf('%', plaintextStartIdx) + } + if (plaintextStartIdx < length) { + sb.append(path, plaintextStartIdx, length) + } + sb.toString + } + } def apply(scanExec: FileSourceScanExec): DeltaScanTransformer = { new DeltaScanTransformer( diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala index e16a6d12fdab..f6a414db0f3b 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala @@ -16,28 +16,37 @@ */ package org.apache.gluten.extension -import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer} +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.execution.{DeltaScanTransformer, FilterExecTransformerBase, ProjectExecTransformer} import org.apache.gluten.extension.columnar.transition.RemoveTransitions import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateNamedStruct, Expression, GetStructField, If, InputFileBlockLength, InputFileBlockStart, InputFileName, IsNull, LambdaFunction, Literal, NamedLambdaVariable} -import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, TransformKeys, TransformValues} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, NamedExpression} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping} -import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.StructType -import scala.collection.mutable import scala.collection.mutable.ListBuffer object DeltaPostTransformRules { def rules: Seq[Rule[SparkPlan]] = - RemoveTransitions :: pushDownInputFileExprRule :: columnMappingRule :: Nil + RemoveTransitions :: + nativeDeletionVectorRule :: + pushDownInputFileExprRule :: + columnMappingRule :: Nil + + private val deletionVectorDeletedRowColumnName = "__delta_internal_is_row_deleted" + private val deletionVectorRowIndexColumnName = "__delta_internal_row_index" + private val deletionVectorInternalColumnNames = + Set(deletionVectorDeletedRowColumnName, deletionVectorRowIndexColumnName) private val COLUMN_MAPPING_RULE_TAG: TreeNodeTag[String] = TreeNodeTag[String]("org.apache.gluten.delta.column.mapping") + private val PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG: TreeNodeTag[Boolean] = + TreeNodeTag[Boolean]("org.apache.gluten.delta.preserve.deletion.vector.row.index") private def notAppliedColumnMappingRule(plan: SparkPlan): Boolean = { plan.getTagValue(COLUMN_MAPPING_RULE_TAG).isEmpty @@ -65,6 +74,87 @@ object DeltaPostTransformRules { child.copy(output = p.output) } + /** + * Spark Delta injects synthetic deletion-vector predicates and columns into the plan. Those are + * needed for the JVM reader path, but for the native Delta scan path they must be stripped or + * they will be applied twice with incompatible semantics. + */ + val nativeDeletionVectorRule: Rule[SparkPlan] = (plan: SparkPlan) => { + tagRowIndexRequiredSubtrees(plan) + plan.transformUp { + case scan: DeltaScanTransformer => + val cleanedDataFilters = scan.dataFilters.flatMap(stripDeletionVectorPredicate) + val cleanedPushDownFilters = + scan.pushDownFilters.map(_.flatMap(stripDeletionVectorPredicate)) + val preserveRowIndex = shouldPreserveDeletionVectorRowIndex(scan) + val cleanedOutput = stripDeletionVectorInternalOutput(scan.output, preserveRowIndex) + val cleanedRequiredSchema = + stripDeletionVectorInternalSchema(scan.requiredSchema, preserveRowIndex) + if ( + cleanedDataFilters == scan.dataFilters && + cleanedPushDownFilters == scan.pushDownFilters && + cleanedOutput == scan.output && + cleanedRequiredSchema == scan.requiredSchema + ) { + scan + } else { + scan.copy( + output = cleanedOutput, + requiredSchema = cleanedRequiredSchema, + dataFilters = cleanedDataFilters, + pushDownFilters = cleanedPushDownFilters) + } + case project: ProjectExecTransformer if containsNativeDeltaScan(project.child) => + val cleanedProjectList = stripDeletionVectorInternalProjectList( + project.projectList, + shouldPreserveDeletionVectorRowIndex(project)) + if (cleanedProjectList == project.projectList) { + project + } else if (cleanedProjectList.isEmpty) { + project.child + } else { + ProjectExecTransformer(cleanedProjectList, project.child) + } + case project: ProjectExec if containsNativeDeltaScan(project.child) => + val cleanedProjectList = stripDeletionVectorInternalProjectList( + project.projectList, + shouldPreserveDeletionVectorRowIndex(project)) + if (cleanedProjectList == project.projectList) { + project + } else if (cleanedProjectList.isEmpty) { + project.child + } else { + ProjectExec(cleanedProjectList, project.child) + } + case filter: FilterExecTransformerBase if containsNativeDeltaScan(filter.child) => + stripDeletionVectorPredicate(filter.cond) match { + case Some(cleanCondition) if cleanCondition != filter.cond => + BackendsApiManager.getSparkPlanExecApiInstance + .genFilterExecTransformer(cleanCondition, filter.child) + case Some(_) => + filter + case None => + filter.child + } + case filter: FilterExec if containsNativeDeltaScan(filter.child) => + stripDeletionVectorPredicate(filter.condition) match { + case Some(cleanCondition) if cleanCondition != filter.condition => + FilterExec(cleanCondition, filter.child) + case Some(_) => + filter + case None => + filter.child + } + } + } + + private def containsNativeDeltaScan(plan: SparkPlan): Boolean = { + plan.exists { + case _: DeltaScanTransformer => true + case _ => false + } + } + private def isDeltaColumnMappingFileFormat(fileFormat: FileFormat): Boolean = fileFormat match { case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping => true @@ -79,6 +169,82 @@ object DeltaPostTransformRules { } } + private def referencesDeletionVectorInternalColumn(expr: Expression): Boolean = { + expr.references.exists(attr => deletionVectorInternalColumnNames.contains(attr.name)) + } + + private def referencesDeletionVectorRowIndex(expr: Expression): Boolean = { + expr.references.exists(_.name == deletionVectorRowIndexColumnName) + } + + private def tagRowIndexRequiredSubtrees(plan: SparkPlan): Unit = { + def tagSubtree(subtree: SparkPlan): Unit = { + subtree.foreach(_.setTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG, true)) + } + + def visit(node: SparkPlan): Unit = { + val shouldPreserveRowIndex = + node.expressions.exists(containsIncrementMetricExpr) || + node.expressions.exists(referencesDeletionVectorRowIndex) + if (shouldPreserveRowIndex) { + node.children.foreach(tagSubtree) + } + node.children.foreach(visit) + } + + visit(plan) + } + + private def shouldPreserveDeletionVectorRowIndex(plan: SparkPlan): Boolean = { + plan.getTagValue(PRESERVE_DELETION_VECTOR_ROW_INDEX_TAG).contains(true) || + plan.expressions.exists(containsIncrementMetricExpr) || + plan.expressions.exists(referencesDeletionVectorRowIndex) + } + + private def shouldStripDeletionVectorInternalColumn( + columnName: String, + preserveRowIndex: Boolean): Boolean = { + columnName == deletionVectorDeletedRowColumnName || + (!preserveRowIndex && columnName == deletionVectorRowIndexColumnName) + } + + private def stripDeletionVectorInternalOutput( + output: Seq[Attribute], + preserveRowIndex: Boolean): Seq[Attribute] = { + output.filterNot(attr => shouldStripDeletionVectorInternalColumn(attr.name, preserveRowIndex)) + } + + private def stripDeletionVectorInternalProjectList( + projectList: Seq[NamedExpression], + preserveRowIndex: Boolean): Seq[NamedExpression] = { + projectList.filterNot( + expr => shouldStripDeletionVectorInternalColumn(expr.name, preserveRowIndex)) + } + + private def stripDeletionVectorInternalSchema( + schema: StructType, + preserveRowIndex: Boolean): StructType = { + StructType( + schema.filterNot( + field => shouldStripDeletionVectorInternalColumn(field.name, preserveRowIndex))) + } + + private def stripDeletionVectorPredicate(expr: Expression): Option[Expression] = { + expr match { + case And(left, right) => + (stripDeletionVectorPredicate(left), stripDeletionVectorPredicate(right)) match { + case (Some(cleanLeft), Some(cleanRight)) => Some(And(cleanLeft, cleanRight)) + case (Some(cleanLeft), None) => Some(cleanLeft) + case (None, Some(cleanRight)) => Some(cleanRight) + case (None, None) => None + } + case other if referencesDeletionVectorInternalColumn(other) => + None + case other => + Some(other) + } + } + private def isInputFileRelatedAttribute(attr: Attribute): Boolean = { attr match { case AttributeReference(name, _, _, _) => @@ -96,73 +262,6 @@ object DeltaPostTransformRules { } } - /** - * Checks whether two structurally compatible DataTypes have different struct field names at any - * nesting level. - */ - private def nestedFieldNamesDiffer(logical: DataType, physical: DataType): Boolean = { - (logical, physical) match { - case (l: StructType, p: StructType) if l.length == p.length => - l.zip(p).exists { - case (lf, pf) => - lf.name != pf.name || nestedFieldNamesDiffer(lf.dataType, pf.dataType) - } - case (l: ArrayType, p: ArrayType) => - nestedFieldNamesDiffer(l.elementType, p.elementType) - case (l: MapType, p: MapType) => - nestedFieldNamesDiffer(l.keyType, p.keyType) || - nestedFieldNamesDiffer(l.valueType, p.valueType) - case _ => false - } - } - - /** - * Rebuilds an expression tree so that nested struct field names match the logical schema. Uses - * positional extraction (GetStructField) and reconstruction (CreateNamedStruct) instead of Cast, - * so correctness does not depend on Velox's cast_match_struct_by_name config. - */ - private def reconcileFieldNames( - expr: Expression, - logical: DataType, - physical: DataType): Expression = { - (logical, physical) match { - case (l: StructType, p: StructType) if l.length == p.length => - val rebuiltFields = l.zip(p).zipWithIndex.flatMap { - case ((lf, pf), i) => - val extracted = GetStructField(expr, i, None) - val reconciled = reconcileFieldNames(extracted, lf.dataType, pf.dataType) - Seq(Literal(lf.name), reconciled) - } - val rebuilt = CreateNamedStruct(rebuiltFields) - If(IsNull(expr), Literal.create(null, l), rebuilt) - case (l: ArrayType, p: ArrayType) if nestedFieldNamesDiffer(l.elementType, p.elementType) => - val lambdaVar = NamedLambdaVariable("element", p.elementType, p.containsNull) - val body = reconcileFieldNames(lambdaVar, l.elementType, p.elementType) - ArrayTransform(expr, LambdaFunction(body, Seq(lambdaVar))) - case (l: MapType, p: MapType) => - val needKeys = nestedFieldNamesDiffer(l.keyType, p.keyType) - val needValues = nestedFieldNamesDiffer(l.valueType, p.valueType) - var result = expr - if (needValues) { - val keyVar = NamedLambdaVariable("key", p.keyType, false) - val valueVar = NamedLambdaVariable("value", p.valueType, p.valueContainsNull) - val body = reconcileFieldNames(valueVar, l.valueType, p.valueType) - result = TransformValues(result, LambdaFunction(body, Seq(keyVar, valueVar))) - } - if (needKeys) { - val keyVar = NamedLambdaVariable("key", p.keyType, false) - val valueVar = NamedLambdaVariable( - "value", - if (needValues) l.valueType else p.valueType, - p.valueContainsNull) - val body = reconcileFieldNames(keyVar, l.keyType, p.keyType) - result = TransformKeys(result, LambdaFunction(body, Seq(keyVar, valueVar))) - } - result - case _ => expr - } - } - /** * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) * transform the metadata of Delta into Parquet's, each plan should only be transformed once. @@ -185,9 +284,8 @@ object DeltaPostTransformRules { )(SparkSession.active) // transform output's name into physical name so Reader can read data correctly // should keep the columns order the same as the origin output - case class ColumnMapping(logicalName: String, logicalType: DataType, physicalAttr: Attribute) - val columnMappings = ListBuffer.empty[ColumnMapping] - val seenNames = mutable.Set.empty[String] + val originColumnNames = ListBuffer.empty[String] + val transformedAttrs = ListBuffer.empty[Attribute] def mapAttribute(attr: Attribute) = { val newAttr = if (plan.isMetadataColumn(attr)) { attr @@ -198,8 +296,9 @@ object DeltaPostTransformRules { .createPhysicalAttributes(Seq(attr), fmt.referenceSchema, fmt.columnMappingMode) .head } - if (seenNames.add(attr.name)) { - columnMappings += ColumnMapping(attr.name, attr.dataType, newAttr) + if (!originColumnNames.contains(attr.name)) { + transformedAttrs += newAttr + originColumnNames += attr.name } newAttr } @@ -239,20 +338,9 @@ object DeltaPostTransformRules { scanExecTransformer.copyTagsFrom(plan) tagColumnMappingRule(scanExecTransformer) - // Alias physical names back to logical names. For struct-typed columns, Delta column - // mapping renames internal field names to physical UUIDs. A top-level Alias only restores - // the column name, not the struct's internal field names. We rebuild the struct with - // logical field names using positional extraction (GetStructField/CreateNamedStruct) - // instead of Cast, so correctness does not depend on any Velox cast config. - val expr = columnMappings.map { - cm => - val projectedExpr: Expression = - if (nestedFieldNamesDiffer(cm.logicalType, cm.physicalAttr.dataType)) { - reconcileFieldNames(cm.physicalAttr, cm.logicalType, cm.physicalAttr.dataType) - } else { - cm.physicalAttr - } - Alias(projectedExpr, cm.logicalName)(exprId = cm.physicalAttr.exprId) + // alias physicalName into tableName + val expr = (transformedAttrs, originColumnNames).zipped.map { + (attr, columnName) => Alias(attr, columnName)(exprId = attr.exprId) } val projectExecTransformer = ProjectExecTransformer(expr.toSeq, scanExecTransformer) projectExecTransformer diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala index 5fe1b4ba86e3..baa7d42cf6dd 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/OffloadDeltaScan.scala @@ -17,16 +17,90 @@ package org.apache.gluten.extension import org.apache.gluten.execution.DeltaScanTransformer +import org.apache.gluten.extension.columnar.FallbackTags import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.delta.DeltaParquetFileFormat +import org.apache.spark.sql.delta.SnapshotDescriptor +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable +import org.apache.spark.sql.delta.files.TahoeFileIndex +import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} +import org.apache.spark.util.SparkVersionUtil case class OffloadDeltaScan() extends OffloadSingleNode { + private val deletionVectorsUseMetadataRowIndexKey = + "spark.databricks.delta.deletionVectors.useMetadataRowIndex" + override def offload(plan: SparkPlan): SparkPlan = plan match { - case scan: FileSourceScanExec - if scan.relation.fileFormat.getClass == classOf[DeltaParquetFileFormat] => + case scan: FileSourceScanExec if isDeltaLogScan(scan) => + FallbackTags.add(scan, "fallback Delta _delta_log scan") + scan + case scan: FileSourceScanExec if shouldFallbackSpark34DeletionVectorScan(scan) => + FallbackTags.add(scan, "fallback Spark 3.4 Delta DV scan") + scan + case scan: FileSourceScanExec if shouldFallbackDeletionVectorScan(scan) => + FallbackTags.add(scan, "fallback Delta DV scan without metadata row index") + scan + case scan: FileSourceScanExec if isDeltaScan(scan) => DeltaScanTransformer(scan) case other => other } + + private def isDeltaScan(scan: FileSourceScanExec): Boolean = { + isDeltaFileIndex(scan) || isDeltaParquetScan(scan) + } + + private def isDeltaParquetScan(scan: FileSourceScanExec): Boolean = { + val fileFormatClass = scan.relation.fileFormat.getClass + fileFormatClass == classOf[DeltaParquetFileFormat] || + fileFormatClass.getSimpleName == "GlutenDeltaParquetFileFormat" + } + + private def isDeltaFileIndex(scan: FileSourceScanExec): Boolean = { + scan.relation.location.isInstanceOf[TahoeFileIndex] || + scan.relation.location.isInstanceOf[PreparedDeltaFileIndex] + } + + private def isDeltaLogScan(scan: FileSourceScanExec): Boolean = { + scan.relation.location.rootPaths.exists { + path => + val root = path.toString + root.contains("/_delta_log") || root.contains("\\_delta_log") || root.endsWith("_delta_log") + } + } + + private def shouldFallbackDeletionVectorScan(scan: FileSourceScanExec): Boolean = { + val useMetadataRowIndex = + scan.relation.sparkSession.sessionState.conf + .getConfString(deletionVectorsUseMetadataRowIndexKey, "true") + .toBoolean + if (useMetadataRowIndex) { + return false + } + + scan.relation.location match { + case index: TahoeFileIndex => + val snapshot = index.asInstanceOf[SnapshotDescriptor] + deletionVectorsReadable(snapshot.protocol, snapshot.metadata) + case _ => + false + } + } + + private def shouldFallbackSpark34DeletionVectorScan(scan: FileSourceScanExec): Boolean = { + if (SparkVersionUtil.gteSpark35) { + return false + } + + scan.relation.location match { + case preparedIndex: PreparedDeltaFileIndex => + preparedIndex.preparedScan.files.exists(_.deletionVector != null) + case index: TahoeFileIndex => + val snapshot = index.asInstanceOf[SnapshotDescriptor] + deletionVectorsReadable(snapshot.protocol, snapshot.metadata) + case _ => + false + } + } } diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 031bf460347d..fda594ef84d5 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -18,7 +18,10 @@ package org.apache.gluten.execution import org.apache.spark.SparkConf import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types._ +import org.apache.spark.util.SparkVersionUtil import scala.collection.JavaConverters._ @@ -37,6 +40,7 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { .set("spark.memory.offHeap.size", "2g") .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set("spark.sql.autoBroadcastJoinThreshold", "-1") + .set("spark.sql.ansi.enabled", "false") .set("spark.sql.sources.useV1SourceList", "avro") .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") @@ -209,16 +213,40 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") checkAnswer(spark.read.format("delta").load(path), df1.union(df2)) spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (${values2.mkString(", ")})") - import org.apache.spark.sql.execution.GlutenImplicits._ val df = spark.read.format("delta").load(path) - assert( - df.fallbackSummary.fallbackNodeToReason - .flatMap(_.values) - .exists(_.contains("Deletion vector is not supported in native"))) + val executedPlan = df.queryExecution.executedPlan + if (SparkVersionUtil.gteSpark35) { + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.nonEmpty) + val planText = executedPlan.toString() + assert(!planText.contains("__delta_internal_is_row_deleted")) + assert(!planText.contains("__delta_internal_row_index")) + } else { + assert(executedPlan.collect { case _: DeltaScanTransformer => true }.isEmpty) + } checkAnswer(df, df1) } } + testWithMinSparkVersion("delta: _delta_log scan should fallback", "3.4") { + withTempPath { + p => + import testImplicits._ + val path = p.getCanonicalPath + Seq((1, "a"), (2, "b")).toDF("id", "value").write.format("delta").save(path) + + val deltaLogDf = spark.read.json(s"$path/_delta_log/*.json") + val executedPlan = deltaLogDf.queryExecution.executedPlan + + assert(executedPlan.collect { case _: FileSourceScanExecTransformerBase => true }.isEmpty) + assert(executedPlan.collect { case _: BatchScanExecTransformerBase => true }.isEmpty) + assert(executedPlan.collect { + case _: FileSourceScanExec => true + case _: BatchScanExec => true + }.nonEmpty) + assert(deltaLogDf.count() > 0) + } + } + testWithMinSparkVersion("delta: push down input_file_name expression", "3.2") { withTable("source_table") { withTable("target_table") { @@ -320,13 +348,13 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { withSQLConf("spark.gluten.sql.columnar.scanOnly" -> "true") { withTable("delta_pf") { spark.sql(s""" - |create table test (id int, name string) using delta + |create table delta_pf (id int, name string) using delta |""".stripMargin) spark.sql(s""" - |insert into test values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") + |insert into delta_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, "v2") |""".stripMargin) runQueryAndCompare( - "select id from test where name > 'v1'", + "select id from delta_pf where name > 'v1'", compareResult = true, noFallBack = false) { df => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/DeltaDeletionVectorRegistry.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/DeltaDeletionVectorRegistry.scala new file mode 100644 index 000000000000..a6f9bcb96125 --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/DeltaDeletionVectorRegistry.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.utils + +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +object DeltaDeletionVectorRegistry { + val RegistryIdProperty = "gluten.delta.dv.registry.id" + + final case class Entry(cardinality: Long, filterType: String, payload: Array[Byte]) + extends Serializable + + private val registry = new ConcurrentHashMap[String, Map[String, Entry]]() + + def register(entries: Map[String, Entry]): String = { + val id = UUID.randomUUID().toString + registry.put(id, entries) + id + } + + def get(id: String): Option[Map[String, Entry]] = Option(registry.get(id)) + + def normalizePathKey(path: String): String = { + path.replace('\\', '/').stripSuffix("/") + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index 0ef0b6a28c3b..2b36cac94b79 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -39,6 +39,9 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) if (!glutenConf.enableFallbackReport) { return plan } + if (GlutenFallbackReporter.containsInternalDeltaLogScan(plan)) { + return plan + } printFallbackReason(plan) if (GlutenUIUtils.uiEnabled(spark.sparkContext)) { postFallbackReason(plan) @@ -96,3 +99,20 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) GlutenUIUtils.postEvent(sc, event) } } + +object GlutenFallbackReporter { + private[execution] def containsInternalDeltaLogScan(plan: SparkPlan): Boolean = { + plan.exists { + case scan: FileSourceScanExec => + scan.relation.location.rootPaths.exists { + path => + val root = path.toString + root.contains("/_delta_log") || + root.contains("\\_delta_log") || + root.endsWith("_delta_log") + } + case _ => + false + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala index 30aac6a8f380..f2529da82f18 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala @@ -22,6 +22,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.catalyst.plans.logical.CommandResult import org.apache.spark.sql.execution.ui.{GlutenUIUtils, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} import scala.collection.mutable @@ -79,6 +80,9 @@ class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with if (!enabledAtStart) { return } + if (shouldSkipInternalDeltaLogQuery(qe)) { + return + } val summary = GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession, qe) @@ -107,6 +111,15 @@ class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener with e) } } + + private def shouldSkipInternalDeltaLogQuery(qe: QueryExecution): Boolean = { + qe.commandExecuted.exists { + case r: CommandResult => + GlutenFallbackReporter.containsInternalDeltaLogScan(r.commandPhysicalPlan) + case _ => + false + } || GlutenFallbackReporter.containsInternalDeltaLogScan(qe.executedPlan) + } } object GlutenQueryExecutionListener {