Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1031623 Allow Parquet Version 2 #672

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private Constants.BdecParquetVersion bdecParquetVersion;

/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param bdecParquetCompression compression algorithm used by parquet
* @param bdecParquetVersion version of parquet used in bdec files
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.bdecParquetVersion = bdecParquetVersion;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -55,25 +61,33 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
this.bdecParquetVersion =
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetVersion()
: ParameterProvider.BDEC_PARQUET_VERSION_DEFAULT;
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param bdecParquetCompression compression algorithm used by parquet
* @param bdecParquetVersion version of parquet used in bdec files
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
bdecParquetVersion);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -91,4 +105,8 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public Constants.BdecParquetVersion getBdecParquetVersion() {
return bdecParquetVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {

private final Constants.BdecParquetCompression bdecParquetCompression;

private final Constants.BdecParquetVersion bdecParquetVersion;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
Expand All @@ -38,11 +40,13 @@ public ParquetFlusher(
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.bdecParquetVersion = bdecParquetVersion;
}

@Override
Expand Down Expand Up @@ -210,7 +214,8 @@ private SerializationResult serializeFromJavaObjects(
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
bdecParquetVersion);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ private void createFileWriter() {
metadata,
channelName,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
clientBufferParameters.getBdecParquetVersion());
} else {
this.bdecParquetWriter = null;
}
Expand Down Expand Up @@ -325,7 +326,8 @@ public Flusher<ParquetChunkData> createFlusher() {
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
clientBufferParameters.getBdecParquetVersion());
}

private static class ParquetColumn {
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;

import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
Expand Down Expand Up @@ -137,6 +142,44 @@ public static BdecParquetCompression fromName(String name) {
name, Arrays.asList(BdecParquetCompression.values())));
}
}

/**
* Version of Parquet. It is a wrapper around Parquet's lib ParquetProperties.WriterVersion.
* We want to control the available options even though we currently support all existing options
*/
public enum BdecParquetVersion {
PARQUET_1_0,
PARQUET_2_0;

public ParquetProperties.WriterVersion getWriterVersion() {
return ParquetProperties.WriterVersion.valueOf(this.name());
}

public ValuesWriterFactory getValuesWriterFactory() {
if (this == PARQUET_1_0) {
return new DefaultV1ValuesWriterFactory();
} else if (this == PARQUET_2_0) {
return new DefaultV2ValuesWriterFactory();
} else {
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_VERSION = '%s', allowed values are \"PARQUET_1_0\", \"PARQUET_2_0\"",
this.name()));
}
}

public static BdecParquetVersion fromName(String name) {
for (BdecParquetVersion v : BdecParquetVersion.values()) {
if (v.name().toLowerCase().equals(name.toLowerCase())) {
return v;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_VERSION = '%s', allowed values are %s",
name, Arrays.asList(BdecParquetVersion.values())));
}
}
// Parameters
public static final boolean DISABLE_BACKGROUND_FLUSH = false;
public static final boolean COMPRESS_BLOB_TWICE = false;
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ParameterProvider {
public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

public static final String BDEC_PARQUET_VERSION = "BDEC_PARQUET_VERSION".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -64,6 +66,9 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

public static final Constants.BdecParquetVersion BDEC_PARQUET_VERSION_DEFAULT =
Constants.BdecParquetVersion.PARQUET_2_0;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -188,6 +193,12 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);

this.updateValue(
BDEC_PARQUET_VERSION,
BDEC_PARQUET_VERSION_DEFAULT,
parameterOverrides,
props);
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -407,6 +418,17 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
return Constants.BdecParquetCompression.fromName((String) val);
}

/** @return BDEC parquet version */
public Constants.BdecParquetVersion getBdecParquetVersion() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_VERSION, BDEC_PARQUET_VERSION_DEFAULT);
if (val instanceof Constants.BdecParquetVersion) {
return (Constants.BdecParquetVersion) val;
}
return Constants.BdecParquetVersion.fromName((String) val);
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ public BdecParquetWriter(
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression)
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
ParquetProperties encodingProps = createParquetProperties();
ParquetProperties encodingProps = createParquetProperties(bdecParquetVersion);
Configuration conf = new Configuration();
WriteSupport<List<Object>> writeSupport =
new BdecWriteSupport(schema, extraMetaData, channelName);
Expand Down Expand Up @@ -119,7 +120,7 @@ public void close() throws IOException {
}
}

private static ParquetProperties createParquetProperties() {
private static ParquetProperties createParquetProperties(Constants.BdecParquetVersion bdecParquetVersion) {
/**
* There are two main limitations on the server side that we have to overcome by tweaking
* Parquet limits:
Expand Down Expand Up @@ -147,8 +148,8 @@ private static ParquetProperties createParquetProperties() {
return ParquetProperties.builder()
// PARQUET_2_0 uses Encoding.DELTA_BYTE_ARRAY for byte arrays (e.g. SF sb16)
// server side does not support it TODO: SNOW-657238
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
.withWriterVersion(bdecParquetVersion.getWriterVersion())
.withValuesWriterFactory(bdecParquetVersion.getValuesWriterFactory())
// the dictionary encoding (Encoding.*_DICTIONARY) is not supported by server side
// scanner yet
.withDictionaryEncoding(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private Map<String, Object> getStartingParameterMap() {
parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L);
parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L);
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "gzip");
parameterMap.put(ParameterProvider.BDEC_PARQUET_VERSION, "PARQUET_2_0");
return parameterMap;
}

Expand Down Expand Up @@ -135,6 +136,9 @@ public void withDefaultValues() {
Assert.assertEquals(
ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterProvider.getBdecParquetCompressionAlgorithm());
Assert.assertEquals(
ParameterProvider.BDEC_PARQUET_VERSION_DEFAULT,
parameterProvider.getBdecParquetVersion());
}

@Test
Expand Down Expand Up @@ -296,7 +300,7 @@ public void testMaxChunksInBlobAndRegistrationRequest() {
}

@Test
public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() {
public void testValidCompressionAlgorithmsWithUppercaseLowerCase() {
List<String> gzipValues = Arrays.asList("GZIP", "gzip", "Gzip", "gZip");
gzipValues.forEach(
v -> {
Expand Down Expand Up @@ -326,4 +330,36 @@ public void testInvalidCompressionAlgorithm() {
e.getMessage());
}
}

@Test
public void testValidParquetVersionsWithUppercaseLowerCase() {
List<String> gzipValues = Arrays.asList("PARQUET_2_0", "Parquet_2_0", "parquet_2_0", "pArquet_2_0");
gzipValues.forEach(
v -> {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.BDEC_PARQUET_VERSION, v);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(
Constants.BdecParquetVersion.PARQUET_2_0,
parameterProvider.getBdecParquetVersion());
});
}

@Test
public void testInvalidParquetVersion() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.BDEC_PARQUET_VERSION, "invalid_version");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBdecParquetVersion();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertEquals(
"Unsupported BDEC_PARQUET_VERSION = 'invalid_version', allowed values are"
+ " [PARQUET_1_0, PARQUET_2_0]",
e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ private AbstractRowBuffer<?> createTestBuffer(OpenChannelRequest.OnErrorOption o
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT,
Constants.BdecParquetCompression.GZIP));
Constants.BdecParquetCompression.GZIP,
Constants.BdecParquetVersion.PARQUET_2_0));
}

@Test
Expand Down
Loading