Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.GlutenDriverEndpoint
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.execution.HiveFileFormat
Expand All @@ -44,6 +45,8 @@ import org.apache.commons.lang3.math.NumberUtils

import java.util.{Map => JMap}

import scala.collection.JavaConverters._

class VeloxTransformerApi extends TransformerApi with Logging {

def genPartitionSeq(
Expand Down Expand Up @@ -131,11 +134,17 @@ class VeloxTransformerApi extends TransformerApi with Logging {
// Only Parquet is supported. It's safe to set a fixed "parquet" here
// because others already fell back by WriteFilesExecTransformer's validation.
val shortName = "parquet"
val writeOptions = Option(write.session).map {
session =>
val hadoopConf = session.sessionState.newHadoopConfWithOptions(write.options)
CaseInsensitiveMap(hadoopConf.iterator().asScala.map(
entry => entry.getKey -> entry.getValue).toMap)
}.getOrElse(write.caseInsensitiveOptions)
val nativeConf =
GlutenFormatFactory(shortName)
.nativeConf(
write.caseInsensitiveOptions,
WriteFilesExecTransformer.getCompressionCodec(write.caseInsensitiveOptions))
writeOptions,
WriteFilesExecTransformer.getCompressionCodec(writeOptions))
packPBMessage(
ConfigMap
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

import java.io.File

import scala.collection.JavaConverters._

class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteUtils {

override protected val resourcePath: String = "/tpch-data-parquet"
Expand Down Expand Up @@ -267,3 +271,53 @@ class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite with WriteU
}
}
}

class VeloxParquetWriteHadoopConfSuite extends VeloxWholeStageTransformerSuite with WriteUtils {

override protected val resourcePath: String = ""
override protected val fileFormat: String = "parquet"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.set(s"spark.hadoop.parquet.enable.dictionary", "false")
}

private val dictionaryEncodingNames = Set("PLAIN_DICTIONARY", "RLE_DICTIONARY")

private def parquetColumnEncodings(dir: File): Seq[Set[String]] = {
val parquetFiles = dir.list((_, name) => name.contains("parquet"))
assert(parquetFiles.nonEmpty)
parquetFiles.flatMap {
file =>
val path = new Path(dir.getAbsolutePath, file)
val in = HadoopInputFile.fromPath(path, spark.sessionState.newHadoopConf())
Utils.tryWithResource(ParquetFileReader.open(in)) {
reader =>
reader.getFooter.getBlocks.asScala.flatMap {
block => block.getColumns.asScala.map(_.getEncodings.asScala.map(_.name()).toSet)
}
}
}.toSeq
}

test("native writer should respect parquet dictionary config from spark.hadoop config") {
spark
.range(0, 20000, 1, 1)
.selectExpr("concat('gluten-parquet-dictionary-', CAST(id % 10 AS STRING)) AS payload")
.createOrReplaceTempView("parquet_dictionary_source")

withTempPath {
hadoopConfDir =>
checkNativeWrite(
s"""
|INSERT OVERWRITE DIRECTORY USING PARQUET
|OPTIONS ('path' '${hadoopConfDir.getCanonicalPath}')
|SELECT * FROM parquet_dictionary_source
|""".stripMargin)
val columnEncodings = parquetColumnEncodings(hadoopConfDir)
assert(columnEncodings.nonEmpty)
assert(!columnEncodings.exists(_.exists(dictionaryEncodingNames.contains)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -77,11 +77,18 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == "true") {
// Pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
val writeOptions = CaseInsensitiveMap(
sparkSession.sessionState
.newHadoopConfWithOptions(options)
.iterator()
.asScala
.map(entry => entry.getKey -> entry.getValue)
.toMap)
val parquetOptions = new ParquetOptions(writeOptions, sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
val nativeConf =
GlutenFormatFactory(shortName())
.nativeConf(options, parquetOptions.compressionCodecClassName)
.nativeConf(writeOptions, parquetOptions.compressionCodecClassName)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand Down
Loading