diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index dc7d40d2a38..58cef4b6081 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -171,6 +171,7 @@ set(PARQUET_SRCS encryption/internal_file_encryptor.cc exception.cc file_reader.cc + file_rewriter.cc file_writer.cc geospatial/statistics.cc geospatial/util_internal.cc @@ -408,6 +409,8 @@ add_parquet_test(arrow-reader-writer-test arrow/arrow_statistics_test.cc arrow/variant_test.cc) +add_parquet_test(arrow-rewriter-test SOURCES arrow/arrow_rewriter_test.cc) + add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc arrow/reconstruct_internal_test.cc) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index e081b428e24..568f3c50f21 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -375,19 +375,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2); template <> struct test_traits<::arrow::StringType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::BinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; - static std::string const value; + static const std::string value; }; template <> struct test_traits<::arrow::FixedSizeBinaryType> { static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; - static std::string const value; + static const std::string value; }; const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT @@ -5906,28 +5906,6 @@ auto encode_double = [](double value) { class ParquetPageIndexRoundTripTest : public ::testing::Test { public: - void WriteFile(const std::shared_ptr& writer_properties, - const std::shared_ptr<::arrow::Table>& table) { - // Get schema from table. - auto schema = table->schema(); - std::shared_ptr parquet_schema; - auto arrow_writer_properties = default_arrow_writer_properties(); - ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, - *arrow_writer_properties, &parquet_schema)); - auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); - - // Write table to buffer. - auto sink = CreateOutputStream(); - auto pool = ::arrow::default_memory_pool(); - auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); - std::unique_ptr arrow_writer; - ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, - &arrow_writer)); - ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); - ASSERT_OK_NO_THROW(arrow_writer->Close()); - ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish()); - } - void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages, const std::set& expect_columns_without_index = {}) { auto read_properties = default_arrow_reader_properties(); @@ -6015,7 +5993,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) { [null, "d", [] ], [5, null, [3, 3, 3]], [6, "f", null ] - ])"})); + ])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); @@ -6057,7 +6036,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) { [null, "d", [] ], [5, null, [3, 3, 3]], [6, "f", null ] - ])"})); + ])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1); for (auto& column_index : column_indexes_) { @@ -6082,7 +6062,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) { [null, "d", [] ], [5, null, [3, 3, 3]], [6, "f", null ] - ])"})); + ])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); @@ -6116,7 +6097,8 @@ TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) { WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([ ["short_string"], ["very_large_string_to_drop_stats"] - ])"})); + ])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1); @@ -6140,7 +6122,8 @@ TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) { writer_properties, ::arrow::TableFromJSON( schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])", - R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"})); + R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4); @@ -6180,7 +6163,7 @@ TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) { auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())}); auto table = Table::Make(schema, {chunked_array}); - WriteFile(writer_properties, table); + WriteFile(writer_properties, table, buffer_); ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1); @@ -6215,7 +6198,8 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) { ->enable_write_page_index("c0") /* enable c0 explicitly */ ->disable_write_page_index("c1") /* disable c1 explicitly */ ->build(); - WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"})); + WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"}), + buffer_); ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1, /*expect_columns_without_index=*/{1}); diff --git a/cpp/src/parquet/arrow/arrow_rewriter_test.cc b/cpp/src/parquet/arrow/arrow_rewriter_test.cc new file mode 100644 index 00000000000..ea4125f5e2e --- /dev/null +++ b/cpp/src/parquet/arrow/arrow_rewriter_test.cc @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/memory.h" +#include "arrow/testing/gtest_util.h" +#include "parquet/arrow/reader.h" +#include "parquet/file_reader.h" +#include "parquet/file_rewriter.h" +#ifdef _MSC_VER +# pragma warning(push) +// Disable forcing value to bool warnings +# pragma warning(disable : 4800) +#endif + +#include + +#include "gtest/gtest.h" + +#include "parquet/arrow/test_util.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +using arrow::Table; +using arrow::io::BufferReader; + +namespace parquet::arrow { + +TEST(ParquetRewriterTest, ConcatRoundTrip) { + auto rewriter_properties = + RewriterProperties::Builder() + .writer_properties( + WriterProperties::Builder().enable_write_page_index()->build()) + ->build(); + + auto schema = ::arrow::schema( + {::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())}); + + std::shared_ptr buffer_up; + std::shared_ptr buffer_down; + + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer_up); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(schema, {R"([[3, "c"]])"}), buffer_down); + + auto sink = CreateOutputStream(); + auto rewriter = + ParquetFileRewriter::Open({{std::make_shared(buffer_up), + std::make_shared(buffer_down)}}, + sink, {{NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties); + rewriter->Rewrite(); + rewriter->Close(); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_buffer, sink->Finish()); + auto file_reader = ParquetFileReader::Open(std::make_shared(out_buffer)); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader)); + + std::shared_ptr table; + ASSERT_OK(reader->ReadTable(&table)); + ASSERT_OK(table->ValidateFull()); + + auto expected_table = + ::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"}); + AssertTablesEqual(*expected_table, *table); +} + +TEST(ParquetRewriterTest, DISABLED_ExtendRoundTrip) { + auto rewriter_properties = + RewriterProperties::Builder() + .writer_properties( + WriterProperties::Builder().enable_write_page_index()->build()) + ->build(); + + auto left_schema = ::arrow::schema( + {::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())}); + auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())}); + + std::shared_ptr buffer_left; + std::shared_ptr buffer_right; + + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"}), + buffer_left); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(right_schema, {R"([[10], [20], [30]])"}), + buffer_right); + + auto sink = CreateOutputStream(); + auto rewriter = ParquetFileRewriter::Open( + {{std::make_shared(buffer_left)}, + {std::make_shared(buffer_right)}}, + sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties); + rewriter->Rewrite(); + rewriter->Close(); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_buffer, sink->Finish()); + auto file_reader = ParquetFileReader::Open(std::make_shared(out_buffer)); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader)); + + std::shared_ptr
table; + ASSERT_OK(reader->ReadTable(&table)); + ASSERT_OK(table->ValidateFull()); + + auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()), + ::arrow::field("b", ::arrow::utf8()), + ::arrow::field("c", ::arrow::int64())}); + auto expected_table = ::arrow::TableFromJSON( + expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"}); + AssertTablesEqual(*expected_table, *table); +} + +TEST(ParquetRewriterTest, DISABLED_SimpleRoundTrip) { + auto rewriter_properties = RewriterProperties::Builder() + .writer_properties(WriterProperties::Builder() + .enable_write_page_index() + ->max_row_group_length(1) + ->build()) + ->build(); + + auto left_schema = ::arrow::schema( + {::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())}); + auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())}); + + std::shared_ptr buffer_left_up; + std::shared_ptr buffer_left_down; + std::shared_ptr buffer_right_up; + std::shared_ptr buffer_right_down; + + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"]])"}), + buffer_left_up); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(left_schema, {R"([[3, "c"]])"}), buffer_left_down); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(right_schema, {R"([[10]])"}), buffer_right_up); + WriteFile(rewriter_properties->writer_properties(), + ::arrow::TableFromJSON(right_schema, {R"([[20], [30]])"}), buffer_right_down); + + auto sink = CreateOutputStream(); + auto rewriter = ParquetFileRewriter::Open( + {{std::make_shared(buffer_left_up), + std::make_shared(buffer_left_down)}, + {std::make_shared(buffer_right_up), + std::make_shared(buffer_right_down)}}, + sink, {{NULLPTR, NULLPTR}, {NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties); + rewriter->Rewrite(); + rewriter->Close(); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr out_buffer, sink->Finish()); + auto file_reader = ParquetFileReader::Open(std::make_shared(out_buffer)); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader)); + + std::shared_ptr
table; + ASSERT_OK(reader->ReadTable(&table)); + ASSERT_OK(table->ValidateFull()); + + auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()), + ::arrow::field("b", ::arrow::utf8()), + ::arrow::field("c", ::arrow::int64())}); + auto expected_table = ::arrow::TableFromJSON( + expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"}); + AssertTablesEqual(*expected_table, *table); +} + +} // namespace parquet::arrow diff --git a/cpp/src/parquet/arrow/test_util.h b/cpp/src/parquet/arrow/test_util.h index 05f6fd24ac0..30d75b58e0a 100644 --- a/cpp/src/parquet/arrow/test_util.h +++ b/cpp/src/parquet/arrow/test_util.h @@ -28,18 +28,23 @@ #include "arrow/array/builder_binary.h" #include "arrow/array/builder_decimal.h" #include "arrow/array/builder_primitive.h" +#include "arrow/table.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" #include "arrow/type_fwd.h" #include "arrow/type_traits.h" #include "arrow/util/decimal.h" #include "arrow/util/float16.h" +#include "parquet/arrow/schema.h" +#include "parquet/arrow/writer.h" #include "parquet/column_reader.h" +#include "parquet/file_writer.h" #include "parquet/test_util.h" namespace parquet { using internal::RecordReader; +using schema::GroupNode; namespace arrow { @@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) { EXPECT_TRUE(result->Equals(*expected_array)); } +void WriteFile(const std::shared_ptr& writer_properties, + const std::shared_ptr<::arrow::Table>& table, + std::shared_ptr& buffer) { + // Get schema from table. + auto schema = table->schema(); + std::shared_ptr parquet_schema; + auto arrow_writer_properties = default_arrow_writer_properties(); + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + // Write table to buffer. + auto sink = CreateOutputStream(); + auto pool = ::arrow::default_memory_pool(); + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table)); + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(buffer, sink->Finish()); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/file_rewriter.cc b/cpp/src/parquet/file_rewriter.cc new file mode 100644 index 00000000000..4deed78a4ba --- /dev/null +++ b/cpp/src/parquet/file_rewriter.cc @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_rewriter.h" + +#include +#include +#include +#include +#include + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/column_reader.h" +#include "parquet/exception.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" +#include "parquet/metadata.h" +#include "parquet/page_index.h" +#include "parquet/platform.h" +#include "parquet/properties.h" + +namespace parquet { + +namespace { +void CopyStream(std::shared_ptr from, + std::shared_ptr to, int64_t size, + ::arrow::MemoryPool* pool) { + int64_t bytes_copied = 0; + if (from->supports_zero_copy()) { + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto buffer, from->Read(size - bytes_copied)); + if (buffer->size() == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), buffer->size())); + bytes_copied += buffer->size(); + } + return; + } + + std::shared_ptr buffer = + AllocateBuffer(pool, kDefaultOutputStreamSize); + while (bytes_copied < size) { + PARQUET_ASSIGN_OR_THROW(auto read_size, from->Read(size - bytes_copied, &buffer)); + if (read_size == 0) { + throw ParquetException("Unexpected end of stream at ", bytes_copied); + } + PARQUET_THROW_NOT_OK(to->Write(buffer->data(), read_size)); + bytes_copied += read_size; + } +} +} // namespace + +const std::shared_ptr& default_rewriter_properties() { + static std::shared_ptr default_rewriter_properties = + RewriterProperties::Builder().build(); + return default_rewriter_properties; +} + +class RowGroupRewriter { + public: + RowGroupRewriter(std::shared_ptr source, + std::shared_ptr sink, + const RewriterProperties* props, + std::shared_ptr row_group_reader, + std::shared_ptr page_index_reader, + std::shared_ptr bloom_filter_reader) + : source_(std::move(source)), + sink_(std::move(sink)), + props_(props), + row_group_reader_(std::move(row_group_reader)), + page_index_reader_(std::move(page_index_reader)), + bloom_filter_reader_(std::move(bloom_filter_reader)), + metadata_(row_group_reader_->metadata()) {} + + void WriteRowGroupData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + rg_metadata_builder->set_num_rows(metadata_->num_rows()); + if (fast_copy) { + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - metadata_->file_offset(); + + auto stream = props_->reader_properties().GetStream( + source_, metadata_->file_offset(), metadata_->total_compressed_size()); + CopyStream(stream, sink_, metadata_->total_compressed_size(), + props_->memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + } + + total_bytes_written += metadata_->total_byte_size(); + } else { + for (int i = 0; i < metadata_->num_columns(); ++i) { + auto cc_metadata = metadata_->ColumnChunk(i); + + PARQUET_ASSIGN_OR_THROW(int64_t sink_offset, sink_->Tell()); + int64_t shift = sink_offset - cc_metadata->start_offset(); + + // TODO(HuaHuaY): add else branch to rewrite column chunk with new encoding, + // compression, etc. + if (true) { + // TODO(HuaHuaY): copy column index and bloom filter instead of reading and + // writing it. + auto column_index = page_index_reader_->GetColumnIndex(i); + auto offset_index = page_index_reader_->GetOffsetIndex(i); + // TODO(HuaHuaY): support bloom filter writer + [[maybe_unused]] auto bloom_filter = + bloom_filter_reader_->GetColumnBloomFilter(i); + + auto stream = props_->reader_properties().GetStream( + source_, cc_metadata->start_offset(), cc_metadata->total_compressed_size()); + CopyStream(stream, sink_, cc_metadata->total_compressed_size(), + props_->memory_pool()); + PARQUET_THROW_NOT_OK(stream->Close()); + + rg_metadata_builder->NextColumnChunk(std::move(cc_metadata), shift); + + page_index_builder->SetColumnIndex(i, column_index); + page_index_builder->SetOffsetIndex(i, offset_index, shift); + + total_bytes_written += cc_metadata->total_uncompressed_size(); + } + } + } + } + + private: + std::shared_ptr source_; + std::shared_ptr sink_; + const RewriterProperties* props_; + std::shared_ptr row_group_reader_; + std::shared_ptr page_index_reader_; + std::shared_ptr bloom_filter_reader_; + const RowGroupMetaData* metadata_; +}; + +class SingleFileRewriter { + public: + SingleFileRewriter(std::shared_ptr source, + std::shared_ptr sink, + std::shared_ptr source_metadata, + const RewriterProperties* props) + : source_(source), + sink_(std::move(sink)), + props_(props), + parquet_file_reader_(ParquetFileReader::Open( + std::move(source), props->reader_properties(), std::move(source_metadata))), + page_index_reader_(parquet_file_reader_->GetPageIndexReader()), + bloom_filter_reader_(parquet_file_reader_->GetBloomFilterReader()), + metadata_(parquet_file_reader_->metadata()) { + std::vector row_group_indices(metadata_->num_row_groups()); + std::iota(row_group_indices.begin(), row_group_indices.end(), 0); + std::vector column_indices(metadata_->num_columns()); + std::iota(column_indices.begin(), column_indices.end(), 0); + page_index_reader_->WillNeed(row_group_indices, column_indices, + {/*column_index=*/true, /*offset_index=*/true}); + } + + void WriteRowGroupData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + if (current_row_group_index_ >= metadata_->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << current_row_group_index_ + << " but file only has " << metadata_->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + auto row_group_metadata = metadata_->RowGroup(current_row_group_index_); + auto row_group_reader = parquet_file_reader_->RowGroup(current_row_group_index_); + auto page_index_reader = page_index_reader_->RowGroup(current_row_group_index_); + auto bloom_filter_reader = bloom_filter_reader_.RowGroup(current_row_group_index_); + RowGroupRewriter rewriter(source_, sink_, props_, std::move(row_group_reader), + std::move(page_index_reader), + std::move(bloom_filter_reader)); + rewriter.WriteRowGroupData(fast_copy && row_group_metadata->file_offset() != 0, + rg_metadata_builder, page_index_builder, + total_bytes_written); + ++current_row_group_index_; + } + + bool HasMoreRowGroup() { + return current_row_group_index_ < metadata_->num_row_groups(); + } + + void Close() { parquet_file_reader_->Close(); } + + const SchemaDescriptor* schema() const { return metadata_->schema(); } + + std::vector row_group_row_counts() const { + int num_row_groups = metadata_->num_row_groups(); + std::vector row_counts; + row_counts.reserve(num_row_groups); + for (int i = 0; i < num_row_groups; ++i) { + row_counts.emplace_back(metadata_->RowGroup(i)->num_rows()); + } + return row_counts; + } + + private: + std::shared_ptr source_; + std::shared_ptr sink_; + const RewriterProperties* props_; + std::unique_ptr parquet_file_reader_; + std::shared_ptr page_index_reader_; + BloomFilterReader& bloom_filter_reader_; + std::shared_ptr metadata_; + int current_row_group_index_{}; +}; + +class ConcatRewriter { + public: + explicit ConcatRewriter(std::vector> rewriters) + : file_rewriters_(std::move(rewriters)) { + auto* schema = file_rewriters_[0]->schema(); + for (size_t i = 1; i < file_rewriters_.size(); ++i) { + if (!schema->Equals(*file_rewriters_[i]->schema())) { + throw ParquetException("Input files have different schemas, current index: ", i); + } + } + } + + void WriteRowGroupData(bool fast_copy, RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + file_rewriters_[current_rewriter_index_]->WriteRowGroupData( + fast_copy, rg_metadata_builder, page_index_builder, total_bytes_written); + } + + bool HasMoreRowGroup() { + while (current_rewriter_index_ < file_rewriters_.size() && + !file_rewriters_[current_rewriter_index_]->HasMoreRowGroup()) { + file_rewriters_[current_rewriter_index_]->Close(); + ++current_rewriter_index_; + } + return current_rewriter_index_ < file_rewriters_.size(); + } + + void Close() { + for (size_t i = current_rewriter_index_; i < file_rewriters_.size(); ++i) { + file_rewriters_[i]->Close(); + } + } + + const SchemaDescriptor* schema() const { return file_rewriters_[0]->schema(); } + + std::vector row_group_row_counts() const { + std::vector row_counts; + for (auto& rewriter : file_rewriters_) { + auto count = rewriter->row_group_row_counts(); + row_counts.insert(row_counts.end(), count.begin(), count.end()); + } + return row_counts; + } + + private: + std::vector> file_rewriters_; + size_t current_rewriter_index_{}; +}; + +class ExtendRewriter { + public: + // TODO(HuaHuaY): use type like `std::vector>>` to handle batch stream in memory. + explicit ExtendRewriter(std::vector> rewriters) + : rewriters_(std::move(rewriters)) { + std::unordered_set column_paths; + for (auto& rewriter : rewriters_) { + auto schema = rewriter->schema(); + for (int i = 0; i < schema->num_columns(); ++i) { + auto path = schema->Column(i)->path()->ToDotString(); + // TODO(HuaHuaY): give an option about keeping which column. + if (column_paths.find(path) != column_paths.end()) { + throw ParquetException("NotImplemented, files have same column path: ", path); + } + column_paths.emplace(std::move(path)); + } + } + auto row_counts = rewriters_[0]->row_group_row_counts(); + for (size_t i = 1; i < rewriters_.size(); ++i) { + if (auto current_row_counts = rewriters_[i]->row_group_row_counts(); + row_counts != current_row_counts) { + auto vecToString = [](const std::vector& v) { + std::ostringstream oss; + oss << "["; + for (size_t i = 0; i < v.size(); ++i) { + oss << v[i] << (i + 1 < v.size() ? ", " : ""); + } + oss << "]"; + return oss.str(); + }; + throw ParquetException( + "The number of rows in each block must match! No.0 blocks row counts: ", + vecToString(row_counts), ", No.", i, + " blocks row counts: ", vecToString(current_row_counts)); + } + } + } + + void WriteRowGroupData(RowGroupMetaDataBuilder* rg_metadata_builder, + PageIndexBuilder* page_index_builder, + int64_t& total_bytes_written) { + if (rewriters_.size() == 1) { + rewriters_[0]->WriteRowGroupData(true, rg_metadata_builder, page_index_builder, + total_bytes_written); + return; + } + for (auto& rewriter : rewriters_) { + rewriter->WriteRowGroupData(false, rg_metadata_builder, page_index_builder, + total_bytes_written); + } + } + + bool HasMoreRowGroup() { return rewriters_[0]->HasMoreRowGroup(); } + + void Close() { + for (auto& rewriter : rewriters_) { + rewriter->Close(); + } + } + + const SchemaDescriptor* schema() const { + // TODO(HuaHuaY): support file joining later. + if (rewriters_.size() > 1) { + throw ParquetException("NotImplemented, only support one ConcatRewriter now."); + } + return rewriters_[0]->schema(); + } + + private: + std::vector> rewriters_; +}; + +// ---------------------------------------------------------------------- +// GeneratedFile + +class GeneratedFile : public ParquetFileRewriter::Contents { + public: + static std::unique_ptr Open( + std::vector>> sources, + std::shared_ptr sink, + std::vector>> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) { + std::unique_ptr result(new GeneratedFile( + std::move(sources), std::move(sink), std::move(sources_metadata), + std::move(sink_metadata), std::move(props))); + return result; + } + + void Close() override { + if (rewriter_) { + rewriter_->Close(); + rewriter_.reset(); + } + } + + void Rewrite() override { + while (rewriter_->HasMoreRowGroup()) { + auto* rg_metadata_builder = metadata_builder_->AppendRowGroup(); + page_index_builder_->AppendRowGroup(); + int64_t total_bytes_written = 0; + rewriter_->WriteRowGroupData(rg_metadata_builder, page_index_builder_.get(), + total_bytes_written); + rg_metadata_builder->Finish(total_bytes_written); + } + page_index_builder_->Finish(); + PageIndexLocation location; + page_index_builder_->WriteTo(sink_.get(), &location); + metadata_builder_->SetPageIndexLocation(location); + + auto file_metadata = metadata_builder_->Finish(sink_metadata_); + WriteFileMetaData(*file_metadata, sink_.get()); + } + + private: + GeneratedFile(std::vector>> sources, + std::shared_ptr sink, + std::vector>> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) + : sink_(std::move(sink)), + props_(std::move(props)), + sink_metadata_(std::move(sink_metadata)) { + std::vector> rewriters; + rewriters.reserve(sources.size()); + for (size_t i = 0; i < sources.size(); ++i) { + std::vector> concat_rewriters; + concat_rewriters.reserve(sources[i].size()); + for (size_t j = 0; j < sources[i].size(); ++j) { + concat_rewriters.emplace_back(std::make_unique( + std::move(sources[i][j]), sink_, std::move(sources_metadata[i][j]), + props_.get())); + } + rewriters.emplace_back( + std::make_unique(std::move(concat_rewriters))); + } + rewriter_ = std::make_unique(std::move(rewriters)); + + if (props_->writer_properties()->file_encryption_properties() == nullptr) { + // Unencrypted parquet files always start with PAR1 + PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4)); + } else { + throw ParquetException( + "NotImplemented, rewriter does not support to write encrypted files."); + } + + auto new_schema = rewriter_->schema()->schema_root(); + new_schema_.Init(new_schema); + metadata_builder_ = + FileMetaDataBuilder::Make(&new_schema_, props_->writer_properties()); + if (props_->writer_properties()->page_index_enabled()) { + page_index_builder_ = PageIndexBuilder::Make(&new_schema_, nullptr); + } + } + + std::shared_ptr sink_; + std::shared_ptr props_; + std::shared_ptr sink_metadata_; + std::unique_ptr rewriter_; + + SchemaDescriptor new_schema_; + std::unique_ptr metadata_builder_; + std::unique_ptr page_index_builder_; +}; + +// ---------------------------------------------------------------------- +// ParquetFilesRewriter public API + +ParquetFileRewriter::ParquetFileRewriter() = default; + +ParquetFileRewriter::~ParquetFileRewriter() { + try { + Close(); + } catch (...) { + } +} + +std::unique_ptr ParquetFileRewriter::Open( + std::vector>> sources, + std::shared_ptr sink, + std::vector>> sources_metadata, + std::shared_ptr sink_metadata, + std::shared_ptr props) { + auto contents = GeneratedFile::Open(std::move(sources), std::move(sink), + std::move(sources_metadata), + std::move(sink_metadata), std::move(props)); + std::unique_ptr result(new ParquetFileRewriter()); + result->Open(std::move(contents)); + return result; +} + +void ParquetFileRewriter::Open(std::unique_ptr contents) { + contents_ = std::move(contents); +} + +void ParquetFileRewriter::Close() { + if (contents_) { + contents_->Close(); + contents_.reset(); + } +} + +void ParquetFileRewriter::Rewrite() { contents_->Rewrite(); } + +} // namespace parquet diff --git a/cpp/src/parquet/file_rewriter.h b/cpp/src/parquet/file_rewriter.h new file mode 100644 index 00000000000..0a208c08fee --- /dev/null +++ b/cpp/src/parquet/file_rewriter.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "parquet/platform.h" +#include "parquet/properties.h" +#include "parquet/type_fwd.h" + +namespace parquet { + +class PARQUET_EXPORT ParquetFileRewriter { + public: + struct PARQUET_EXPORT Contents { + virtual ~Contents() = default; + virtual void Close() = 0; + virtual void Rewrite() = 0; + }; + + ParquetFileRewriter(); + ~ParquetFileRewriter(); + + static std::unique_ptr Open( + std::vector>> sources, + std::shared_ptr sink, + std::vector>> sources_metadata, + std::shared_ptr sink_metadata = NULLPTR, + std::shared_ptr props = default_rewriter_properties()); + + void Open(std::unique_ptr contents); + void Close(); + + void Rewrite(); + + private: + std::unique_ptr contents_; +}; + +} // namespace parquet diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc index 42dd8e52ee9..445f2a3ea58 100644 --- a/cpp/src/parquet/metadata.cc +++ b/cpp/src/parquet/metadata.cc @@ -413,6 +413,10 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { inline int64_t data_page_offset() const { return column_metadata_->data_page_offset; } + inline int64_t start_offset() const { + return has_dictionary_page() ? dictionary_page_offset() : data_page_offset(); + } + inline bool has_index_page() const { return column_metadata_->__isset.index_page_offset; } @@ -454,6 +458,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl { return key_value_metadata_; } + const void* to_thrift() const { return column_metadata_; } + private: void InitKeyValueMetadata() { key_value_metadata_ = FromThriftKeyValueMetadata(*column_metadata_); @@ -550,6 +556,8 @@ int64_t ColumnChunkMetaData::data_page_offset() const { return impl_->data_page_offset(); } +int64_t ColumnChunkMetaData::start_offset() const { return impl_->start_offset(); } + bool ColumnChunkMetaData::has_index_page() const { return impl_->has_index_page(); } int64_t ColumnChunkMetaData::index_page_offset() const { @@ -601,6 +609,8 @@ const std::shared_ptr& ColumnChunkMetaData::key_value_me return impl_->key_value_metadata(); } +const void* ColumnChunkMetaData::to_thrift() const { return impl_->to_thrift(); } + // row-group metadata class RowGroupMetaData::RowGroupMetaDataImpl { public: @@ -1888,6 +1898,23 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { return column_builder_ptr; } + void NextColumnChunk(std::unique_ptr cc_metadata, int64_t shift) { + auto* column_chunk = &row_group_->columns[next_column_++]; + column_chunk->__set_file_offset(0); + column_chunk->__isset.meta_data = true; + column_chunk->meta_data = + *static_cast(cc_metadata->to_thrift()); + column_chunk->meta_data.__set_dictionary_page_offset( + column_chunk->meta_data.dictionary_page_offset + shift); + column_chunk->meta_data.__set_data_page_offset( + column_chunk->meta_data.data_page_offset + shift); + column_chunk->meta_data.__set_index_page_offset( + column_chunk->meta_data.index_page_offset + shift); + column_chunk->meta_data.__set_bloom_filter_offset( + column_chunk->meta_data.bloom_filter_offset + shift); + column_builders_.push_back(NULLPTR); + } + int current_column() { return next_column_ - 1; } void Finish(int64_t total_bytes_written, int16_t row_group_ordinal) { @@ -1919,6 +1946,10 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { } // sometimes column metadata is encrypted and not available to read, // so we must get total_compressed_size from column builder + if (column_builders_[i] == NULLPTR) { + total_compressed_size += row_group_->columns[i].meta_data.total_compressed_size; + continue; + } total_compressed_size += column_builders_[i]->total_compressed_size(); } @@ -1973,6 +2004,11 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { return impl_->NextColumnChunk(); } +void RowGroupMetaDataBuilder::NextColumnChunk( + std::unique_ptr cc_metadata, int64_t shift) { + return impl_->NextColumnChunk(std::move(cc_metadata), shift); +} + int RowGroupMetaDataBuilder::current_column() const { return impl_->current_column(); } int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h index 3380adbf56a..972237445c9 100644 --- a/cpp/src/parquet/metadata.h +++ b/cpp/src/parquet/metadata.h @@ -159,6 +159,7 @@ class PARQUET_EXPORT ColumnChunkMetaData { bool has_dictionary_page() const; int64_t dictionary_page_offset() const; int64_t data_page_offset() const; + int64_t start_offset() const; bool has_index_page() const; int64_t index_page_offset() const; int64_t total_compressed_size() const; @@ -168,6 +169,8 @@ class PARQUET_EXPORT ColumnChunkMetaData { std::optional GetOffsetIndexLocation() const; const std::shared_ptr& key_value_metadata() const; + const void* to_thrift() const; + private: explicit ColumnChunkMetaData( const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal, @@ -488,6 +491,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { ~RowGroupMetaDataBuilder(); ColumnChunkMetaDataBuilder* NextColumnChunk(); + void NextColumnChunk(std::unique_ptr cc_metadata, int64_t shift); int num_columns(); int64_t num_rows(); int current_column() const; diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc index 5c2e78c68db..ff0fe38254f 100644 --- a/cpp/src/parquet/page_index.cc +++ b/cpp/src/parquet/page_index.cc @@ -31,6 +31,7 @@ #include "arrow/util/unreachable.h" #include +#include #include namespace parquet { @@ -176,6 +177,8 @@ class TypedColumnIndexImpl : public TypedColumnIndex { return column_index_.repetition_level_histograms; } + const void* to_thrift() const override { return &column_index_; } + private: /// Wrapped thrift column index. const format::ColumnIndex column_index_; @@ -761,6 +764,11 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { column_index_builders_.back().resize(num_columns); offset_index_builders_.back().resize(num_columns); + column_indices_.emplace_back(); + offset_indices_.emplace_back(); + column_indices_.back().resize(num_columns); + offset_indices_.back().resize(num_columns); + DCHECK_EQ(column_index_builders_.size(), offset_index_builders_.size()); DCHECK_EQ(column_index_builders_.back().size(), num_columns); DCHECK_EQ(offset_index_builders_.back().size(), num_columns); @@ -784,6 +792,23 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { return builder.get(); } + void SetColumnIndex(int32_t i, + const std::shared_ptr& column_index) override { + CheckState(i); + column_indices_.back()[i] = std::make_unique( + *static_cast(column_index->to_thrift())); + } + + void SetOffsetIndex(int32_t i, const std::shared_ptr& offset_index, + int64_t shift) override { + CheckState(i); + auto index = std::make_unique(ToThrift(*offset_index)); + for (auto& page_location : index->page_locations) { + page_location.__set_offset(page_location.offset + shift); + } + offset_indices_.back()[i] = std::move(index); + } + void Finish() override { finished_ = true; } void WriteTo(::arrow::io::OutputStream* sink, @@ -796,10 +821,12 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { location->offset_index_location.clear(); /// Serialize column index ordered by row group ordinal and then column ordinal. - SerializeIndex(column_index_builders_, sink, &location->column_index_location); + SerializeIndex(column_index_builders_, column_indices_, sink, + &location->column_index_location); /// Serialize offset index ordered by row group ordinal and then column ordinal. - SerializeIndex(offset_index_builders_, sink, &location->offset_index_location); + SerializeIndex(offset_index_builders_, offset_indices_, sink, + &location->offset_index_location); } private: @@ -832,9 +859,14 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { return encryptor; } + template + using Index = std::conditional_t, + format::ColumnIndex, format::OffsetIndex>; + template void SerializeIndex( const std::vector>>& page_index_builders, + const std::vector>>>& page_indices, ::arrow::io::OutputStream* sink, std::map>>* location) const { const auto num_columns = static_cast(schema_->num_columns()); @@ -844,6 +876,9 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { /// Serialize the same kind of page index row group by row group. for (size_t row_group = 0; row_group < page_index_builders.size(); ++row_group) { + const auto& row_group_page_indices = page_indices[row_group]; + DCHECK_EQ(row_group_page_indices.size(), num_columns); + const auto& row_group_page_index_builders = page_index_builders[row_group]; DCHECK_EQ(row_group_page_index_builders.size(), num_columns); @@ -852,15 +887,20 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { /// In the same row group, serialize the same kind of page index column by column. for (size_t column = 0; column < num_columns; ++column) { + const auto& column_page_index = row_group_page_indices[column]; const auto& column_page_index_builder = row_group_page_index_builders[column]; - if (column_page_index_builder != nullptr) { + if (column_page_index != nullptr || column_page_index_builder != nullptr) { /// Get encryptor if encryption is enabled. std::shared_ptr encryptor = GetColumnMetaEncryptor( static_cast(row_group), static_cast(column), module_type); /// Try serializing the page index. PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell()); - column_page_index_builder->WriteTo(sink, encryptor.get()); + if (column_page_index != nullptr) { + ThriftSerializer{}.Serialize(column_page_index.get(), sink, encryptor.get()); + } else { + column_page_index_builder->WriteTo(sink, encryptor.get()); + } PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell()); int64_t len = pos_after_write - pos_before_write; @@ -874,6 +914,7 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { } locations[column] = {pos_before_write, static_cast(len)}; has_valid_index = true; + continue; } } @@ -887,6 +928,8 @@ class PageIndexBuilderImpl final : public PageIndexBuilder { InternalFileEncryptor* file_encryptor_; std::vector>> column_index_builders_; std::vector>> offset_index_builders_; + std::vector>> column_indices_; + std::vector>> offset_indices_; bool finished_ = false; }; diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h index 3083159783b..bc72ff21d53 100644 --- a/cpp/src/parquet/page_index.h +++ b/cpp/src/parquet/page_index.h @@ -22,6 +22,7 @@ #include "parquet/type_fwd.h" #include "parquet/types.h" +#include #include #include @@ -86,6 +87,8 @@ class PARQUET_EXPORT ColumnIndex { /// \brief List of repetition level histograms for each page concatenated together. virtual const std::vector& repetition_level_histograms() const = 0; + + virtual const void* to_thrift() const = 0; }; /// \brief Typed implementation of ColumnIndex. @@ -369,6 +372,12 @@ class PARQUET_EXPORT PageIndexBuilder { /// the PageIndexBuilder. virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0; + virtual void SetColumnIndex(int32_t i, + const std::shared_ptr& column_index) = 0; + + virtual void SetOffsetIndex(int32_t i, const std::shared_ptr& offset_index, + int64_t shift) = 0; + /// \brief Complete the page index builder and no more write is allowed. virtual void Finish() = 0; diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index 4e6c558e064..e4921c0b760 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -28,7 +28,7 @@ namespace parquet { std::shared_ptr ReaderProperties::GetStream( - std::shared_ptr source, int64_t start, int64_t num_bytes) { + std::shared_ptr source, int64_t start, int64_t num_bytes) const { if (buffered_stream_enabled_) { // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment // of source diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5a1799c39d7..e02b22a2182 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -77,7 +77,7 @@ class PARQUET_EXPORT ReaderProperties { MemoryPool* memory_pool() const { return pool_; } std::shared_ptr GetStream(std::shared_ptr source, - int64_t start, int64_t num_bytes); + int64_t start, int64_t num_bytes) const; /// Buffered stream reading allows the user to control the memory usage of /// parquet readers. This ensure that all `RandomAccessFile::ReadAt` calls are @@ -1409,4 +1409,74 @@ struct ArrowWriteContext { PARQUET_EXPORT std::shared_ptr default_arrow_writer_properties(); +class PARQUET_EXPORT RewriterProperties { + public: + class Builder { + public: + Builder() + : pool_(::arrow::default_memory_pool()), + writer_properties_(default_writer_properties()), + reader_properties_(default_reader_properties()) {} + + explicit Builder(const RewriterProperties& properties) + : pool_(properties.memory_pool()), + writer_properties_(properties.writer_properties()), + reader_properties_(properties.reader_properties()) {} + + virtual ~Builder() = default; + + /// Specify the memory pool for the rewriter. Default default_memory_pool. + Builder* memory_pool(MemoryPool* pool) { + pool_ = pool; + return this; + } + + /// Set the writer properties. + Builder* writer_properties(std::shared_ptr properties) { + writer_properties_ = std::move(properties); + return this; + } + + /// Set the reader properties. + Builder* reader_properties(ReaderProperties properties) { + reader_properties_ = std::move(properties); + return this; + } + + /// Build the RewriterProperties with the builder parameters. + std::shared_ptr build() { + return std::shared_ptr(new RewriterProperties( + pool_, std::move(writer_properties_), std::move(reader_properties_))); + } + + private: + MemoryPool* pool_; + std::shared_ptr writer_properties_; + ReaderProperties reader_properties_; + }; + + MemoryPool* memory_pool() const { return pool_; } + + const std::shared_ptr& writer_properties() const { + return writer_properties_; + } + + const ReaderProperties& reader_properties() const { return reader_properties_; } + + private: + explicit RewriterProperties(MemoryPool* pool, + std::shared_ptr writer_properties, + ReaderProperties reader_properties) + : pool_(pool), + writer_properties_(std::move(writer_properties)), + reader_properties_(std::move(reader_properties)) {} + + MemoryPool* pool_; + std::shared_ptr writer_properties_; + ReaderProperties reader_properties_; +}; + +PARQUET_EXPORT +const std::shared_ptr& default_rewriter_properties(); + } // namespace parquet diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 8f82adae928..50d032751fa 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -17,6 +17,7 @@ #pragma once +#include "parquet/page_index.h" #include "parquet/windows_compatibility.h" #include @@ -554,6 +555,30 @@ static inline format::SizeStatistics ToThrift(const SizeStatistics& size_stats) return size_statistics; } +static inline format::PageLocation ToThrift(const PageLocation& page_location) { + format::PageLocation thrift_page_location; + thrift_page_location.__set_offset(page_location.offset); + thrift_page_location.__set_compressed_page_size(page_location.compressed_page_size); + thrift_page_location.__set_first_row_index(page_location.first_row_index); + return thrift_page_location; +} + +static inline format::OffsetIndex ToThrift(const OffsetIndex& offset_index) { + format::OffsetIndex thrift_offset_index; + std::vector thrift_page_locations; + thrift_page_locations.reserve(offset_index.page_locations().size()); + for (const auto& page_location : offset_index.page_locations()) { + thrift_page_locations.push_back(ToThrift(page_location)); + } + thrift_offset_index.page_locations = std::move(thrift_page_locations); + if (!offset_index.unencoded_byte_array_data_bytes().empty()) { + thrift_offset_index.__set_unencoded_byte_array_data_bytes( + offset_index.unencoded_byte_array_data_bytes()); + thrift_offset_index.__isset.unencoded_byte_array_data_bytes = true; + } + return thrift_offset_index; +} + // ---------------------------------------------------------------------- // Thrift struct serialization / deserialization utilities