From 110f79593c67c58a2c2465bf2fb340ff4711003f Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 19 Sep 2023 08:53:08 -0700 Subject: [PATCH] Propagate ParquetOutputFormat options to ParquetWriter (#4980) * Propagate ParquetOutputFormat options to ParquetWriter * fixup * fixup * cleanup test * Apply configuration in scio-smb Parquet as well * Add missing runtime dep to scio-smb * add page size, writer version * Add page properties * javafmt * remove unused import * scope to test * Fix build issues --------- Co-authored-by: Michel Davit --- build.sbt | 5 + .../com/spotify/scio/parquet/WriterUtils.java | 50 ++++++- .../scio/parquet/TestWriterUtils.scala | 130 ++++++++++++++++++ .../smb/ParquetAvroFileOperations.java | 10 +- .../beam/sdk/extensions/smb/ParquetUtils.java | 86 ++++++++++++ .../smb/ParquetTypeFileOperations.scala | 20 ++- 6 files changed, 279 insertions(+), 22 deletions(-) create mode 100644 scio-parquet/src/test/scala/com/spotify/scio/parquet/TestWriterUtils.scala create mode 100644 scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetUtils.java diff --git a/build.sbt b/build.sbt index 1fb475a7fd..4851d936c7 100644 --- a/build.sbt +++ b/build.sbt @@ -1326,6 +1326,10 @@ lazy val `scio-smb`: Project = project .settings(beamRunnerSettings) .settings( description := "Sort Merge Bucket source/sink implementations for Apache Beam", + unusedCompileDependenciesFilter -= Seq( + // ParquetUtils calls functions defined in parent class from hadoop-mapreduce-client-core + moduleFilter("org.apache.hadoop", "hadoop-mapreduce-client-core") + ).reduce(_ | _), libraryDependencies ++= Seq( // compile "com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion, @@ -1355,6 +1359,7 @@ lazy val `scio-smb`: Project = project "com.github.ben-manes.caffeine" % "caffeine" % caffeineVersion % Provided, "org.apache.avro" % "avro" % avroVersion % Provided, "org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided, + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Provided, "org.apache.parquet" % "parquet-avro" % parquetVersion % Provided excludeAll ("org.apache.avro" % "avro"), "org.apache.parquet" % "parquet-column" % parquetVersion % Provided, "org.apache.parquet" % "parquet-common" % parquetVersion % Provided, diff --git a/scio-parquet/src/main/java/com/spotify/scio/parquet/WriterUtils.java b/scio-parquet/src/main/java/com/spotify/scio/parquet/WriterUtils.java index 9034d5b194..5d2c6d1caf 100644 --- a/scio-parquet/src/main/java/com/spotify/scio/parquet/WriterUtils.java +++ b/scio-parquet/src/main/java/com/spotify/scio/parquet/WriterUtils.java @@ -18,7 +18,11 @@ package com.spotify.scio.parquet; import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -28,12 +32,54 @@ public static > ParquetWriter ParquetWriter.Builder builder, Configuration conf, CompressionCodecName compression) throws IOException { // https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat - int rowGroupSize = - conf.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); + long rowGroupSize = + conf.getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_ENABLED, Boolean::parseBoolean) + .entrySet()) { + builder = builder.withBloomFilterEnabled(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.ENABLE_DICTIONARY, Boolean::parseBoolean) + .entrySet()) { + builder = builder.withDictionaryEncoding(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, Long::parseLong) + .entrySet()) { + builder = builder.withBloomFilterNDV(entry.getKey(), entry.getValue()); + } + return builder .withConf(conf) .withCompressionCodec(compression) + .withPageSize(ParquetOutputFormat.getPageSize(conf)) + .withPageRowCountLimit( + conf.getInt( + ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, + ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT)) + .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)) + .withWriterVersion(ParquetOutputFormat.getWriterVersion(conf)) + .withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf)) + .withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf)) + .withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf)) + .withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf)) + .withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf)) + .withValidation(ParquetOutputFormat.getValidation(conf)) .withRowGroupSize(rowGroupSize) .build(); } + + private static Map getColumnarConfig( + Configuration conf, String key, Function toT) { + final String keyPrefix = key + "#"; + return conf.getPropsWithPrefix(keyPrefix).entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().replaceFirst(keyPrefix, ""), + entry -> toT.apply(entry.getValue()))); + } } diff --git a/scio-parquet/src/test/scala/com/spotify/scio/parquet/TestWriterUtils.scala b/scio-parquet/src/test/scala/com/spotify/scio/parquet/TestWriterUtils.scala new file mode 100644 index 0000000000..9fece4890b --- /dev/null +++ b/scio-parquet/src/test/scala/com/spotify/scio/parquet/TestWriterUtils.scala @@ -0,0 +1,130 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.parquet + +import com.spotify.scio._ +import com.spotify.scio.parquet.avro._ +import com.spotify.scio.avro.{Account, AccountStatus} +import org.apache.commons.io.FileUtils +import org.apache.parquet.HadoopReadOptions +import org.apache.parquet.column.Encoding +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.nio.file.{Files, Path} +import scala.jdk.CollectionConverters._ + +class TestWriterUtils extends AnyFlatSpec with Matchers with BeforeAndAfterAll { + private lazy val testDir = Files.createTempDirectory("scio-parquet-writer-utils-test-").toFile + + override protected def afterAll(): Unit = FileUtils.deleteDirectory(testDir) + + "WriterUtils" should "set configuration values correctly" in { + val path = testDir.toPath.resolve("avro-files") + + val sc = ScioContext() + sc.parallelize(1 to 10) + .map { r => + Account + .newBuilder() + .setId(r) + .setType("checking") + .setName(r.toString) + .setAmount(r.doubleValue) + .setAccountStatus(AccountStatus.Active) + .build() + } + .saveAsParquetAvroFile( // WriterUtils has too many protected classes to invoke directly; test via Scio write + path.toFile.getAbsolutePath, + numShards = 1, + conf = ParquetConfiguration.of( + "parquet.enable.dictionary" -> false, + "parquet.enable.dictionary#account_status" -> true, + "parquet.bloom.filter.enabled" -> true, + "parquet.bloom.filter.enabled#id" -> false + ) + ) + + sc.run() + + val columnEncodings = getColumnEncodings(path.resolve("part-00000-of-00001.parquet")) + + assertColumn( + columnEncodings(0), + "id", + hasBloomFilter = false, + Seq(Encoding.BIT_PACKED, Encoding.PLAIN) + ) + assertColumn( + columnEncodings(1), + "type", + hasBloomFilter = true, + Seq(Encoding.BIT_PACKED, Encoding.PLAIN) + ) + assertColumn( + columnEncodings(2), + "name", + hasBloomFilter = true, + Seq( + Encoding.BIT_PACKED, + Encoding.RLE, // RLE encoding is used for optional fields + Encoding.PLAIN + ) + ) + assertColumn( + columnEncodings(3), + "amount", + hasBloomFilter = true, + Seq(Encoding.BIT_PACKED, Encoding.PLAIN) + ) + assertColumn( + columnEncodings(4), + "account_status", + hasBloomFilter = true, + Seq(Encoding.BIT_PACKED, Encoding.RLE, Encoding.PLAIN_DICTIONARY) + ) + } + + private def getColumnEncodings(path: Path): List[ColumnChunkMetaData] = { + val options = HadoopReadOptions.builder(ParquetConfiguration.empty()).build + val r = ParquetFileReader.open(BeamInputFile.of(path.toFile.getAbsolutePath), options) + assert(r.getRowGroups.size() == 1) + + val columns = r.getRowGroups.get(0).getColumns.asScala.toList + r.close() + columns + } + + private def assertColumn( + column: ColumnChunkMetaData, + name: String, + hasBloomFilter: Boolean, + encodings: Iterable[Encoding] + ): Unit = { + column.getPath.asScala should contain only name + column.getEncodings.asScala should contain theSameElementsAs encodings + if (hasBloomFilter) { + column.getBloomFilterOffset should be > -1L + } else { + column.getBloomFilterOffset shouldBe -1L + } + } +} diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java index fe79d7ceba..65954b1201 100644 --- a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetAvroFileOperations.java @@ -42,7 +42,6 @@ import org.apache.parquet.avro.SpecificDataSupplier; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; -import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -227,14 +226,9 @@ public void open(WritableByteChannel channel) throws IOException { // https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat final Configuration configuration = conf.get(); - int rowGroupSize = - configuration.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); AvroParquetWriter.Builder builder = AvroParquetWriter.builder(new ParquetOutputFile(channel)) - .withSchema(schemaSupplier.get()) - .withCompressionCodec(compression) - .withConf(configuration) - .withRowGroupSize(rowGroupSize); + .withSchema(schemaSupplier.get()); // Workaround for PARQUET-2265 if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) { @@ -248,7 +242,7 @@ public void open(WritableByteChannel channel) throws IOException { ReflectionUtils.newInstance(dataModelSupplier, configuration).get()); } - writer = builder.build(); + writer = ParquetUtils.buildWriter(builder, configuration, compression); } @Override diff --git a/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetUtils.java b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetUtils.java new file mode 100644 index 0000000000..036a709994 --- /dev/null +++ b/scio-smb/src/main/java/org/apache/beam/sdk/extensions/smb/ParquetUtils.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.beam.sdk.extensions.smb; + +import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +class ParquetUtils { + static > ParquetWriter buildWriter( + ParquetWriter.Builder builder, Configuration conf, CompressionCodecName compression) + throws IOException { + // https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat + long rowGroupSize = + conf.getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE); + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_ENABLED, Boolean::parseBoolean) + .entrySet()) { + builder = builder.withBloomFilterEnabled(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.ENABLE_DICTIONARY, Boolean::parseBoolean) + .entrySet()) { + builder = builder.withDictionaryEncoding(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : + getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, Long::parseLong) + .entrySet()) { + builder = builder.withBloomFilterNDV(entry.getKey(), entry.getValue()); + } + + return builder + .withConf(conf) + .withCompressionCodec(compression) + .withPageSize(ParquetOutputFormat.getPageSize(conf)) + .withPageRowCountLimit( + conf.getInt( + ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, + ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT)) + .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf)) + .withWriterVersion(ParquetOutputFormat.getWriterVersion(conf)) + .withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf)) + .withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf)) + .withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf)) + .withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf)) + .withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf)) + .withValidation(ParquetOutputFormat.getValidation(conf)) + .withRowGroupSize(rowGroupSize) + .withRowGroupSize(rowGroupSize) + .build(); + } + + private static Map getColumnarConfig( + Configuration conf, String key, Function toT) { + final String keyPrefix = key + "#"; + return conf.getPropsWithPrefix(keyPrefix).entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey().replaceFirst(keyPrefix, ""), + entry -> toT.apply(entry.getValue()))); + } +} diff --git a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala index 147f794673..ee99146679 100644 --- a/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala +++ b/scio-smb/src/main/scala/org/apache/beam/sdk/extensions/smb/ParquetTypeFileOperations.scala @@ -27,7 +27,7 @@ import org.apache.beam.sdk.util.MimeTypes import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterPredicate -import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetReader, ParquetWriter} +import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter} import org.apache.parquet.hadoop.metadata.CompressionCodecName import java.nio.channels.{ReadableByteChannel, WritableByteChannel} @@ -127,17 +127,13 @@ private case class ParquetTypeSink[T]( extends FileIO.Sink[T] { @transient private var writer: ParquetWriter[T] = _ - override def open(channel: WritableByteChannel): Unit = { - // https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat - val rowGroupSize = - conf.get().getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE) - writer = pt - .writeBuilder(new ParquetOutputFile(channel)) - .withCompressionCodec(compression) - .withConf(conf.get()) - .withRowGroupSize(rowGroupSize) - .build() - } + override def open(channel: WritableByteChannel): Unit = + writer = ParquetUtils.buildWriter( + pt + .writeBuilder(new ParquetOutputFile(channel)), + conf.get(), + compression + ) override def write(element: T): Unit = writer.write(element) override def flush(): Unit = writer.close()