Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Python][Parquet] Implement Content-Defined Chunking for the Parquet writer #45360

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ add_parquet_test(reader-test

add_parquet_test(writer-test
SOURCES
column_chunker_test.cc
column_writer_test.cc
file_serialize_test.cc
stream_writer_test.cc)
Expand Down
784 changes: 784 additions & 0 deletions cpp/src/parquet/column_chunker.h

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions cpp/src/parquet/column_chunker_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
43 changes: 37 additions & 6 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "arrow/util/rle_encoding_internal.h"
#include "arrow/util/type_traits.h"
#include "arrow/visit_array_inline.h"
#include "parquet/column_chunker.h"
#include "parquet/column_page.h"
#include "parquet/encoding.h"
#include "parquet/encryption/encryption_internal.h"
Expand Down Expand Up @@ -752,7 +753,8 @@ class ColumnWriterImpl {
closed_(false),
fallback_(false),
definition_levels_sink_(allocator_),
repetition_levels_sink_(allocator_) {
repetition_levels_sink_(allocator_),
content_defined_chunker_(level_info_, properties->cdc_avg_size()) {
definition_levels_rle_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
repetition_levels_rle_ =
Expand Down Expand Up @@ -892,6 +894,8 @@ class ColumnWriterImpl {

std::vector<std::unique_ptr<DataPage>> data_pages_;

internal::FastCDC content_defined_chunker_;

private:
void InitSinks() {
definition_levels_sink_.Rewind(0);
Expand Down Expand Up @@ -1332,13 +1336,40 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
bits_buffer_->ZeroPadding();
}

if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
if (this->properties_->cdc_enabled()) {
ARROW_ASSIGN_OR_RAISE(auto boundaries,
content_defined_chunker_.GetBoundaries(
def_levels, rep_levels, num_levels, leaf_array));
for (auto boundary : boundaries) {
auto level_offset = std::get<0>(boundary);
auto array_offset = std::get<1>(boundary);
auto levels_to_write = std::get<2>(boundary);
auto sliced_array = leaf_array.Slice(array_offset);
if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
ARROW_CHECK_OK(WriteArrowDictionary(def_levels + level_offset,
rep_levels + level_offset, levels_to_write,
*sliced_array, ctx, maybe_parent_nulls));
} else {
ARROW_CHECK_OK(WriteArrowDense(def_levels + level_offset,
rep_levels + level_offset, levels_to_write,
*sliced_array, ctx, maybe_parent_nulls));
}
if (num_buffered_values_ > 0) {
AddDataPage();
}
// AddDataPage();
}
return Status::OK();
} else {
return WriteArrowDense(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
} else {
return WriteArrowDense(def_levels, rep_levels, num_levels, leaf_array, ctx,
maybe_parent_nulls);
}
}

END_PARQUET_CATCH_EXCEPTIONS
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
#include "parquet/column_chunker.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
#include "parquet/types.h"
Expand Down
44 changes: 39 additions & 5 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/type.h"
#include "arrow/util/compression.h"
#include "arrow/util/type_fwd.h"
#include "parquet/column_chunker.h"
#include "parquet/encryption/encryption.h"
#include "parquet/exception.h"
#include "parquet/parquet_version.h"
Expand Down Expand Up @@ -260,7 +261,9 @@ class PARQUET_EXPORT WriterProperties {
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
page_checksum_enabled_(false),
size_statistics_level_(DEFAULT_SIZE_STATISTICS_LEVEL) {}
size_statistics_level_(DEFAULT_SIZE_STATISTICS_LEVEL),
cdc_enabled_(false),
cdc_avg_size_(0) {}

explicit Builder(const WriterProperties& properties)
: pool_(properties.memory_pool()),
Expand All @@ -275,10 +278,27 @@ class PARQUET_EXPORT WriterProperties {
page_checksum_enabled_(properties.page_checksum_enabled()),
size_statistics_level_(properties.size_statistics_level()),
sorting_columns_(properties.sorting_columns()),
default_column_properties_(properties.default_column_properties()) {}
default_column_properties_(properties.default_column_properties()),
cdc_enabled_(properties.cdc_enabled()),
cdc_avg_size_(properties.cdc_avg_size()) {}

virtual ~Builder() {}

Builder* enable_cdc() {
cdc_enabled_ = true;
return this;
}

Builder* disable_cdc() {
cdc_enabled_ = false;
return this;
}

Builder* cdc_avg_size(uint64_t avg_size) {
cdc_avg_size_ = avg_size;
return this;
}

/// Specify the memory pool for the writer. Default default_memory_pool.
Builder* memory_pool(MemoryPool* pool) {
pool_ = pool;
Expand Down Expand Up @@ -701,7 +721,8 @@ class PARQUET_EXPORT WriterProperties {
pagesize_, version_, created_by_, page_checksum_enabled_,
size_statistics_level_, std::move(file_encryption_properties_),
default_column_properties_, column_properties, data_page_version_,
store_decimal_as_integer_, std::move(sorting_columns_)));
store_decimal_as_integer_, std::move(sorting_columns_), cdc_enabled_,
cdc_avg_size_));
}

private:
Expand Down Expand Up @@ -730,6 +751,9 @@ class PARQUET_EXPORT WriterProperties {
std::unordered_map<std::string, bool> dictionary_enabled_;
std::unordered_map<std::string, bool> statistics_enabled_;
std::unordered_map<std::string, bool> page_index_enabled_;

bool cdc_enabled_;
uint64_t cdc_avg_size_;
};

inline MemoryPool* memory_pool() const { return pool_; }
Expand All @@ -754,6 +778,9 @@ class PARQUET_EXPORT WriterProperties {

inline bool page_checksum_enabled() const { return page_checksum_enabled_; }

inline bool cdc_enabled() const { return cdc_enabled_; }
inline uint64_t cdc_avg_size() const { return cdc_avg_size_; }

inline SizeStatisticsLevel size_statistics_level() const {
return size_statistics_level_;
}
Expand Down Expand Up @@ -856,7 +883,7 @@ class PARQUET_EXPORT WriterProperties {
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties,
ParquetDataPageVersion data_page_version, bool store_short_decimal_as_integer,
std::vector<SortingColumn> sorting_columns)
std::vector<SortingColumn> sorting_columns, bool cdc_enabled, uint64_t cdc_avg_size)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
Expand All @@ -871,7 +898,11 @@ class PARQUET_EXPORT WriterProperties {
file_encryption_properties_(file_encryption_properties),
sorting_columns_(std::move(sorting_columns)),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
column_properties_(column_properties),
cdc_enabled_(cdc_enabled),
cdc_avg_size_(cdc_avg_size)

{}

MemoryPool* pool_;
int64_t dictionary_pagesize_limit_;
Expand All @@ -891,6 +922,9 @@ class PARQUET_EXPORT WriterProperties {

ColumnProperties default_column_properties_;
std::unordered_map<std::string, ColumnProperties> column_properties_;

bool cdc_enabled_;
uint64_t cdc_avg_size_;
};

PARQUET_EXPORT const std::shared_ptr<WriterProperties>& default_writer_properties();
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* disable_write_page_index()
Builder* enable_page_checksum()
Builder* disable_page_checksum()
Builder* enable_cdc()
Builder* disable_cdc()
Builder* cdc_avg_size(uint64_t avg_size)
shared_ptr[WriterProperties] build()

cdef cppclass ArrowWriterProperties:
Expand Down Expand Up @@ -597,6 +600,7 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
write_page_checksum=*,
sorting_columns=*,
store_decimal_as_integer=*,
content_defined_chunking=*
) except *


Expand Down
24 changes: 22 additions & 2 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1851,7 +1851,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False) except *:
store_decimal_as_integer=False,
content_defined_chunking=False) except *:

"""General writer properties"""
cdef:
Expand Down Expand Up @@ -1987,6 +1988,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
raise TypeError(
"'column_encoding' should be a dictionary or a string")

# size limits

if data_page_size is not None:
props.data_pagesize(data_page_size)

Expand All @@ -1996,6 +1999,21 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
if dictionary_pagesize_limit is not None:
props.dictionary_pagesize_limit(dictionary_pagesize_limit)

# content defined chunking

if content_defined_chunking is False:
props.disable_cdc()
elif content_defined_chunking is True:
props.enable_cdc()
elif isinstance(content_defined_chunking, tuple):
avg_size, = content_defined_chunking
props.enable_cdc()
props.cdc_avg_size(avg_size)
else:
raise ValueError(
"Unsupported value for content_defined_chunking: {0}"
.format(content_defined_chunking))

# encryption

if encryption_properties is not None:
Expand Down Expand Up @@ -2169,7 +2187,8 @@ cdef class ParquetWriter(_Weakrefable):
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False):
store_decimal_as_integer=False,
content_defined_chunking=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
Expand Down Expand Up @@ -2204,6 +2223,7 @@ cdef class ParquetWriter(_Weakrefable):
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
content_defined_chunking=content_defined_chunking
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
Expand Down
Loading