Skip to content

Commit

Permalink
move fileId calculation out of ParquetFlusher
Browse files Browse the repository at this point in the history
The primary reason for this PR is to allow passing a custom fileId for
testing PPN behavior in Snowflake. This unblocks migrating some tests to
Snowfort.
  • Loading branch information
sfc-gh-skurella committed Feb 7, 2025
1 parent df6bc9e commit 81d7729
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.zip.CRC32;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
Expand All @@ -36,6 +37,7 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.Preconditions;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand All @@ -62,13 +64,15 @@ class BlobBuilder {
* Builds blob.
*
* @param filePath Path of the destination file in cloud storage
* @param customFileId Allows setting a custom file ID to be embedded for all chunks in storage.
* @param blobData All the data for one blob. Assumes that all ChannelData in the inner List
* belongs to the same table. Will error if this is not the case
* @param bdecVersion version of blob
* @return {@link Blob} data
*/
static <T> Blob constructBlobAndMetadata(
String filePath,
Optional<String> customFileId,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
InternalParameterProvider internalParameterProvider,
Expand Down Expand Up @@ -100,8 +104,9 @@ static <T> Blob constructBlobAndMetadata(
firstChannelFlushContext.getEncryptionKeyId()));

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
String fileId = customFileId.orElse(defaultFileId(filePath, curDataSize));
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath, curDataSize);
flusher.serialize(channelsDataPerTable, filePath, curDataSize, fileId);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down Expand Up @@ -207,6 +212,19 @@ static <T> Blob constructBlobAndMetadata(
return new Blob(blobBytes, chunksMetadataList, new BlobStats());
}

private static String defaultFileId(String filePath, long chunkStartOffset) {
String shortName = StreamingIngestUtils.getShortname(filePath);
if (chunkStartOffset == 0) {
return shortName;
} else {
// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk.
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
return String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]);
}
}

/**
* Pad the compressed data for encryption. Encryption needs padding to the
* ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES to align with decryption on the Snowflake query path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ && shouldStopProcessing(
try {
BlobMetadata blobMetadata =
buildAndUpload(
blobPath, blobData, fullyQualifiedTableName, encryptionKeysPerTable);
blobPath, Optional.empty(), blobData, fullyQualifiedTableName, encryptionKeysPerTable);
blobMetadata.getBlobStats().setFlushStartMs(flushStartMs);
return blobMetadata;
} catch (Throwable e) {
Expand Down Expand Up @@ -599,6 +599,7 @@ private boolean shouldStopProcessing(
*/
BlobMetadata buildAndUpload(
BlobPath blobPath,
Optional<String> customFileId,
List<List<ChannelData<T>>> blobData,
String fullyQualifiedTableName,
Map<FullyQualifiedTableName, EncryptionKey> encryptionKeysPerTable)
Expand All @@ -611,6 +612,7 @@ BlobMetadata buildAndUpload(
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileRegistrationPath,
customFileId,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ public interface Flusher<T> {
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @param fileId the file ID to be stored within the chunk
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset, String fileId)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ public ParquetFlusher(
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
long chunkStartOffset,
String fileId)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset, fileId);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
long chunkStartOffset,
String fileId)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand All @@ -91,11 +93,12 @@ private SerializationResult serializeFromJavaObjects(
channelsMetadataList.add(channelMetadata);

logger.logDebug(
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}, fileId={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);
filePath,
fileId);

if (rows == null) {
columnEpStatsMapCombined = data.getColumnEps();
Expand Down Expand Up @@ -124,15 +127,16 @@ private SerializationResult serializeFromJavaObjects(
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}, fileId={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);
filePath,
fileId);
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
addFileIdToMetadata(filePath, chunkStartOffset, metadata);
addFileIdToMetadata(fileId, chunkStartOffset, metadata);
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand Down Expand Up @@ -160,29 +164,18 @@ private SerializationResult serializeFromJavaObjects(
}

private void addFileIdToMetadata(
String filePath, long chunkStartOffset, Map<String, String> metadata) {
String fileId, long chunkStartOffset, Map<String, String> metadata) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
if (chunkStartOffset == 0) {
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
} else {
// Each chunk is logically a separate Parquet file that happens to be bundled together.
if (enableIcebergStreaming) {
Preconditions.checkState(
!enableIcebergStreaming, "Iceberg streaming is not supported with non-zero offsets");
String shortName = StreamingIngestUtils.getShortname(filePath);
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
Constants.PRIMARY_FILE_ID_KEY,
String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]));
chunkStartOffset == 0, "Iceberg streaming is not supported with non-zero offsets");
metadata.put(Constants.ASSIGNED_FULL_FILE_NAME_KEY, fileId);
} else {
metadata.put(Constants.PRIMARY_FILE_ID_KEY, fileId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testSerializationErrors() throws Exception {
// Construction succeeds if both data and metadata contain 1 row
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Optional.empty(),
Collections.singletonList(createChannelDataPerTable(1)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
Expand All @@ -66,6 +67,7 @@ public void testSerializationErrors() throws Exception {
try {
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Optional.empty(),
Collections.singletonList(createChannelDataPerTable(0)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
Expand All @@ -91,6 +93,7 @@ public void testMetadataAndExtendedMetadataSize() throws Exception {
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
"a.parquet",
Optional.empty(),
Collections.singletonList(createChannelDataPerTable(1)),
Constants.BdecVersion.THREE,
new InternalParameterProvider(enableIcebergStreaming, false /* enableNDVCount */),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -167,6 +168,7 @@ BlobMetadata buildAndUpload() throws Exception {
List<List<ChannelData<T>>> blobData = Collections.singletonList(channelData);
return flushService.buildAndUpload(
new BlobPath("file_name" /* uploadPath */, "file_name" /* fileRegistrationPath */),
Optional.empty(),
blobData,
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName(),
encryptionKeysPerTable);
Expand Down Expand Up @@ -657,7 +659,7 @@ public void testBlobCreation() throws Exception {
if (!enableIcebergStreaming) {
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.atLeast(2))
.buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
.buildAndUpload(Mockito.any(), Mockito.eq(Optional.empty()), Mockito.any(), Mockito.any(), Mockito.any());
}
}

Expand Down Expand Up @@ -710,7 +712,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception {
// Force = true flushes
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.atLeast(2))
.buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
.buildAndUpload(Mockito.any(), Mockito.eq(Optional.empty()), Mockito.any(), Mockito.any(), Mockito.any());
}
}

Expand Down Expand Up @@ -748,7 +750,7 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {
// Force = true flushes
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.times(2))
.buildAndUpload(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
.buildAndUpload(Mockito.any(), Mockito.eq(Optional.empty()), Mockito.any(), Mockito.any(), Mockito.any());
}
}

Expand Down Expand Up @@ -796,7 +798,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
ArgumentCaptor<List<List<ChannelData<List<List<Object>>>>>> blobDataCaptor =
ArgumentCaptor.forClass(List.class);
Mockito.verify(flushService, Mockito.times(expectedBlobs))
.buildAndUpload(Mockito.any(), blobDataCaptor.capture(), Mockito.any(), Mockito.any());
.buildAndUpload(Mockito.any(), Mockito.eq(Optional.empty()), blobDataCaptor.capture(), Mockito.any(), Mockito.any());

// 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns
List<List<List<ChannelData<List<List<Object>>>>>> allUploadedBlobs =
Expand Down Expand Up @@ -844,7 +846,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
ArgumentCaptor<List<List<ChannelData<List<List<Object>>>>>> blobDataCaptor =
ArgumentCaptor.forClass(List.class);
Mockito.verify(flushService, Mockito.atLeast(2))
.buildAndUpload(Mockito.any(), blobDataCaptor.capture(), Mockito.any(), Mockito.any());
.buildAndUpload(Mockito.any(), Mockito.eq(Optional.empty()), blobDataCaptor.capture(), Mockito.any(), Mockito.any());

// 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns
List<List<List<ChannelData<List<List<Object>>>>>> allUploadedBlobs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2306,6 +2306,7 @@ public void testParquetChunkMetadataCreationIsThreadSafe() throws InterruptedExc
@Test
public void testParquetFileNameMetadata() throws IOException {
String filePath = "testParquetFileNameMetadata.bdec";
String fileId = filePath;
final ParquetRowBuffer bufferUnderTest =
(ParquetRowBuffer) createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE);

Expand All @@ -2327,7 +2328,7 @@ public void testParquetFileNameMetadata() throws IOException {
ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 0);
flusher.serialize(Collections.singletonList(data), filePath, 0, fileId);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
Expand All @@ -2345,15 +2346,16 @@ public void testParquetFileNameMetadata() throws IOException {
{
try {
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 13);
flusher.serialize(Collections.singletonList(data), filePath, 13, fileId);
if (enableIcebergStreaming) {
Assert.fail(
"Should have thrown an exception because iceberg streams do not support offsets");
}

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata_13.bdec",
// NB the file ID passed to `serialize` would normally reflect the offset, but here it's a static value.
"testParquetFileNameMetadata.bdec",
reader
.getKeyValueMetadata()
.get(
Expand Down

0 comments on commit 81d7729

Please sign in to comment.