Skip to content

Commit

Permalink
Propagate ParquetOutputFormat options to ParquetWriter (#4980)
Browse files Browse the repository at this point in the history
* Propagate ParquetOutputFormat options to ParquetWriter

* fixup

* fixup

* cleanup test

* Apply configuration in scio-smb Parquet as well

* Add missing runtime dep to scio-smb

* add page size, writer version

* Add page properties

* javafmt

* remove unused import

* scope to test

* Fix build issues

---------

Co-authored-by: Michel Davit <[email protected]>
  • Loading branch information
clairemcginty and RustedBones committed Sep 19, 2023
1 parent 6fe7a29 commit 110f795
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 22 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,10 @@ lazy val `scio-smb`: Project = project
.settings(beamRunnerSettings)
.settings(
description := "Sort Merge Bucket source/sink implementations for Apache Beam",
unusedCompileDependenciesFilter -= Seq(
// ParquetUtils calls functions defined in parent class from hadoop-mapreduce-client-core
moduleFilter("org.apache.hadoop", "hadoop-mapreduce-client-core")
).reduce(_ | _),
libraryDependencies ++= Seq(
// compile
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion,
Expand Down Expand Up @@ -1355,6 +1359,7 @@ lazy val `scio-smb`: Project = project
"com.github.ben-manes.caffeine" % "caffeine" % caffeineVersion % Provided,
"org.apache.avro" % "avro" % avroVersion % Provided,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Provided,
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % Provided,
"org.apache.parquet" % "parquet-avro" % parquetVersion % Provided excludeAll ("org.apache.avro" % "avro"),
"org.apache.parquet" % "parquet-column" % parquetVersion % Provided,
"org.apache.parquet" % "parquet-common" % parquetVersion % Provided,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package com.spotify.scio.parquet;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -28,12 +32,54 @@ public static <T, SELF extends ParquetWriter.Builder<T, SELF>> ParquetWriter<T>
ParquetWriter.Builder<T, SELF> builder, Configuration conf, CompressionCodecName compression)
throws IOException {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
int rowGroupSize =
conf.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);
long rowGroupSize =
conf.getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);

for (Map.Entry<String, Boolean> entry :
getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_ENABLED, Boolean::parseBoolean)
.entrySet()) {
builder = builder.withBloomFilterEnabled(entry.getKey(), entry.getValue());
}

for (Map.Entry<String, Boolean> entry :
getColumnarConfig(conf, ParquetOutputFormat.ENABLE_DICTIONARY, Boolean::parseBoolean)
.entrySet()) {
builder = builder.withDictionaryEncoding(entry.getKey(), entry.getValue());
}

for (Map.Entry<String, Long> entry :
getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, Long::parseLong)
.entrySet()) {
builder = builder.withBloomFilterNDV(entry.getKey(), entry.getValue());
}

return builder
.withConf(conf)
.withCompressionCodec(compression)
.withPageSize(ParquetOutputFormat.getPageSize(conf))
.withPageRowCountLimit(
conf.getInt(
ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT))
.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
.withWriterVersion(ParquetOutputFormat.getWriterVersion(conf))
.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf))
.withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf))
.withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf))
.withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf))
.withValidation(ParquetOutputFormat.getValidation(conf))
.withRowGroupSize(rowGroupSize)
.build();
}

private static <T> Map<String, T> getColumnarConfig(
Configuration conf, String key, Function<String, T> toT) {
final String keyPrefix = key + "#";
return conf.getPropsWithPrefix(keyPrefix).entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey().replaceFirst(keyPrefix, ""),
entry -> toT.apply(entry.getValue())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2023 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet

import com.spotify.scio._
import com.spotify.scio.parquet.avro._
import com.spotify.scio.avro.{Account, AccountStatus}
import org.apache.commons.io.FileUtils
import org.apache.parquet.HadoopReadOptions
import org.apache.parquet.column.Encoding
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.file.{Files, Path}
import scala.jdk.CollectionConverters._

class TestWriterUtils extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
private lazy val testDir = Files.createTempDirectory("scio-parquet-writer-utils-test-").toFile

override protected def afterAll(): Unit = FileUtils.deleteDirectory(testDir)

"WriterUtils" should "set configuration values correctly" in {
val path = testDir.toPath.resolve("avro-files")

val sc = ScioContext()
sc.parallelize(1 to 10)
.map { r =>
Account
.newBuilder()
.setId(r)
.setType("checking")
.setName(r.toString)
.setAmount(r.doubleValue)
.setAccountStatus(AccountStatus.Active)
.build()
}
.saveAsParquetAvroFile( // WriterUtils has too many protected classes to invoke directly; test via Scio write
path.toFile.getAbsolutePath,
numShards = 1,
conf = ParquetConfiguration.of(
"parquet.enable.dictionary" -> false,
"parquet.enable.dictionary#account_status" -> true,
"parquet.bloom.filter.enabled" -> true,
"parquet.bloom.filter.enabled#id" -> false
)
)

sc.run()

val columnEncodings = getColumnEncodings(path.resolve("part-00000-of-00001.parquet"))

assertColumn(
columnEncodings(0),
"id",
hasBloomFilter = false,
Seq(Encoding.BIT_PACKED, Encoding.PLAIN)
)
assertColumn(
columnEncodings(1),
"type",
hasBloomFilter = true,
Seq(Encoding.BIT_PACKED, Encoding.PLAIN)
)
assertColumn(
columnEncodings(2),
"name",
hasBloomFilter = true,
Seq(
Encoding.BIT_PACKED,
Encoding.RLE, // RLE encoding is used for optional fields
Encoding.PLAIN
)
)
assertColumn(
columnEncodings(3),
"amount",
hasBloomFilter = true,
Seq(Encoding.BIT_PACKED, Encoding.PLAIN)
)
assertColumn(
columnEncodings(4),
"account_status",
hasBloomFilter = true,
Seq(Encoding.BIT_PACKED, Encoding.RLE, Encoding.PLAIN_DICTIONARY)
)
}

private def getColumnEncodings(path: Path): List[ColumnChunkMetaData] = {
val options = HadoopReadOptions.builder(ParquetConfiguration.empty()).build
val r = ParquetFileReader.open(BeamInputFile.of(path.toFile.getAbsolutePath), options)
assert(r.getRowGroups.size() == 1)

val columns = r.getRowGroups.get(0).getColumns.asScala.toList
r.close()
columns
}

private def assertColumn(
column: ColumnChunkMetaData,
name: String,
hasBloomFilter: Boolean,
encodings: Iterable[Encoding]
): Unit = {
column.getPath.asScala should contain only name
column.getEncodings.asScala should contain theSameElementsAs encodings
if (hasBloomFilter) {
column.getBloomFilterOffset should be > -1L
} else {
column.getBloomFilterOffset shouldBe -1L
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.parquet.avro.SpecificDataSupplier;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand Down Expand Up @@ -227,14 +226,9 @@ public void open(WritableByteChannel channel) throws IOException {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
final Configuration configuration = conf.get();

int rowGroupSize =
configuration.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);
AvroParquetWriter.Builder<ValueT> builder =
AvroParquetWriter.<ValueT>builder(new ParquetOutputFile(channel))
.withSchema(schemaSupplier.get())
.withCompressionCodec(compression)
.withConf(configuration)
.withRowGroupSize(rowGroupSize);
.withSchema(schemaSupplier.get());

// Workaround for PARQUET-2265
if (configuration.getClass(AvroWriteSupport.AVRO_DATA_SUPPLIER, null) != null) {
Expand All @@ -248,7 +242,7 @@ public void open(WritableByteChannel channel) throws IOException {
ReflectionUtils.newInstance(dataModelSupplier, configuration).get());
}

writer = builder.build();
writer = ParquetUtils.buildWriter(builder, configuration, compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.beam.sdk.extensions.smb;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

class ParquetUtils {
static <T, SELF extends ParquetWriter.Builder<T, SELF>> ParquetWriter<T> buildWriter(
ParquetWriter.Builder<T, SELF> builder, Configuration conf, CompressionCodecName compression)
throws IOException {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
long rowGroupSize =
conf.getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE);

for (Map.Entry<String, Boolean> entry :
getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_ENABLED, Boolean::parseBoolean)
.entrySet()) {
builder = builder.withBloomFilterEnabled(entry.getKey(), entry.getValue());
}

for (Map.Entry<String, Boolean> entry :
getColumnarConfig(conf, ParquetOutputFormat.ENABLE_DICTIONARY, Boolean::parseBoolean)
.entrySet()) {
builder = builder.withDictionaryEncoding(entry.getKey(), entry.getValue());
}

for (Map.Entry<String, Long> entry :
getColumnarConfig(conf, ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV, Long::parseLong)
.entrySet()) {
builder = builder.withBloomFilterNDV(entry.getKey(), entry.getValue());
}

return builder
.withConf(conf)
.withCompressionCodec(compression)
.withPageSize(ParquetOutputFormat.getPageSize(conf))
.withPageRowCountLimit(
conf.getInt(
ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT))
.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
.withWriterVersion(ParquetOutputFormat.getWriterVersion(conf))
.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(conf))
.withDictionaryEncoding(ParquetOutputFormat.getEnableDictionary(conf))
.withDictionaryPageSize(ParquetOutputFormat.getDictionaryPageSize(conf))
.withMaxRowCountForPageSizeCheck(ParquetOutputFormat.getMaxRowCountForPageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(ParquetOutputFormat.getMinRowCountForPageSizeCheck(conf))
.withValidation(ParquetOutputFormat.getValidation(conf))
.withRowGroupSize(rowGroupSize)
.withRowGroupSize(rowGroupSize)
.build();
}

private static <T> Map<String, T> getColumnarConfig(
Configuration conf, String key, Function<String, T> toT) {
final String keyPrefix = key + "#";
return conf.getPropsWithPrefix(keyPrefix).entrySet().stream()
.collect(
Collectors.toMap(
entry -> entry.getKey().replaceFirst(keyPrefix, ""),
entry -> toT.apply(entry.getValue())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.beam.sdk.util.MimeTypes
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetReader, ParquetWriter}
import org.apache.parquet.hadoop.{ParquetReader, ParquetWriter}
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
Expand Down Expand Up @@ -127,17 +127,13 @@ private case class ParquetTypeSink[T](
extends FileIO.Sink[T] {
@transient private var writer: ParquetWriter[T] = _

override def open(channel: WritableByteChannel): Unit = {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
val rowGroupSize =
conf.get().getLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)
writer = pt
.writeBuilder(new ParquetOutputFile(channel))
.withCompressionCodec(compression)
.withConf(conf.get())
.withRowGroupSize(rowGroupSize)
.build()
}
override def open(channel: WritableByteChannel): Unit =
writer = ParquetUtils.buildWriter(
pt
.writeBuilder(new ParquetOutputFile(channel)),
conf.get(),
compression
)

override def write(element: T): Unit = writer.write(element)
override def flush(): Unit = writer.close()
Expand Down

0 comments on commit 110f795

Please sign in to comment.