Skip to content

Commit c216849

Browse files
committed
Implement basic parquet file rewriter
1 parent 8e3d849 commit c216849

File tree

14 files changed

+951
-40
lines changed

14 files changed

+951
-40
lines changed

cpp/src/parquet/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ set(PARQUET_SRCS
171171
encryption/internal_file_encryptor.cc
172172
exception.cc
173173
file_reader.cc
174+
file_rewriter.cc
174175
file_writer.cc
175176
geospatial/statistics.cc
176177
geospatial/util_internal.cc
@@ -408,6 +409,8 @@ add_parquet_test(arrow-reader-writer-test
408409
arrow/arrow_statistics_test.cc
409410
arrow/variant_test.cc)
410411

412+
add_parquet_test(arrow-rewriter-test SOURCES arrow/arrow_rewriter_test.cc)
413+
411414
add_parquet_test(arrow-internals-test SOURCES arrow/path_internal_test.cc
412415
arrow/reconstruct_internal_test.cc)
413416

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -375,19 +375,19 @@ const double test_traits<::arrow::DoubleType>::value(4.2);
375375
template <>
376376
struct test_traits<::arrow::StringType> {
377377
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
378-
static std::string const value;
378+
static const std::string value;
379379
};
380380

381381
template <>
382382
struct test_traits<::arrow::BinaryType> {
383383
static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
384-
static std::string const value;
384+
static const std::string value;
385385
};
386386

387387
template <>
388388
struct test_traits<::arrow::FixedSizeBinaryType> {
389389
static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY;
390-
static std::string const value;
390+
static const std::string value;
391391
};
392392

393393
const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT
@@ -5906,28 +5906,6 @@ auto encode_double = [](double value) {
59065906

59075907
class ParquetPageIndexRoundTripTest : public ::testing::Test {
59085908
public:
5909-
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
5910-
const std::shared_ptr<::arrow::Table>& table) {
5911-
// Get schema from table.
5912-
auto schema = table->schema();
5913-
std::shared_ptr<SchemaDescriptor> parquet_schema;
5914-
auto arrow_writer_properties = default_arrow_writer_properties();
5915-
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
5916-
*arrow_writer_properties, &parquet_schema));
5917-
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
5918-
5919-
// Write table to buffer.
5920-
auto sink = CreateOutputStream();
5921-
auto pool = ::arrow::default_memory_pool();
5922-
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
5923-
std::unique_ptr<FileWriter> arrow_writer;
5924-
ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
5925-
&arrow_writer));
5926-
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
5927-
ASSERT_OK_NO_THROW(arrow_writer->Close());
5928-
ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
5929-
}
5930-
59315909
void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
59325910
const std::set<int>& expect_columns_without_index = {}) {
59335911
auto read_properties = default_arrow_reader_properties();
@@ -6015,7 +5993,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) {
60155993
[null, "d", [] ],
60165994
[5, null, [3, 3, 3]],
60175995
[6, "f", null ]
6018-
])"}));
5996+
])"}),
5997+
buffer_);
60195998

60205999
ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
60216000

@@ -6057,7 +6036,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithStatsDisabled) {
60576036
[null, "d", [] ],
60586037
[5, null, [3, 3, 3]],
60596038
[6, "f", null ]
6060-
])"}));
6039+
])"}),
6040+
buffer_);
60616041

60626042
ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1);
60636043
for (auto& column_index : column_indexes_) {
@@ -6082,7 +6062,8 @@ TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTripWithColumnStatsDisabled) {
60826062
[null, "d", [] ],
60836063
[5, null, [3, 3, 3]],
60846064
[6, "f", null ]
6085-
])"}));
6065+
])"}),
6066+
buffer_);
60866067

60876068
ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
60886069

@@ -6116,7 +6097,8 @@ TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) {
61166097
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
61176098
["short_string"],
61186099
["very_large_string_to_drop_stats"]
6119-
])"}));
6100+
])"}),
6101+
buffer_);
61206102

61216103
ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
61226104

@@ -6140,7 +6122,8 @@ TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) {
61406122
writer_properties,
61416123
::arrow::TableFromJSON(
61426124
schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
6143-
R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"}));
6125+
R"([[null, null], [6, "f"]])", R"([[null, null], [null, null]])"}),
6126+
buffer_);
61446127

61456128
ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4);
61466129

@@ -6180,7 +6163,7 @@ TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
61806163

61816164
auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())});
61826165
auto table = Table::Make(schema, {chunked_array});
6183-
WriteFile(writer_properties, table);
6166+
WriteFile(writer_properties, table, buffer_);
61846167

61856168
ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1);
61866169

@@ -6215,7 +6198,8 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
62156198
->enable_write_page_index("c0") /* enable c0 explicitly */
62166199
->disable_write_page_index("c1") /* disable c1 explicitly */
62176200
->build();
6218-
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"}));
6201+
WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([[0, 1, 2]])"}),
6202+
buffer_);
62196203

62206204
ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/1,
62216205
/*expect_columns_without_index=*/{1});
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/io/memory.h"
19+
#include "arrow/testing/gtest_util.h"
20+
#include "parquet/arrow/reader.h"
21+
#include "parquet/file_reader.h"
22+
#include "parquet/file_rewriter.h"
23+
#ifdef _MSC_VER
24+
# pragma warning(push)
25+
// Disable forcing value to bool warnings
26+
# pragma warning(disable : 4800)
27+
#endif
28+
29+
#include <memory>
30+
31+
#include "gtest/gtest.h"
32+
33+
#include "parquet/arrow/test_util.h"
34+
#include "parquet/platform.h"
35+
#include "parquet/properties.h"
36+
37+
using arrow::Table;
38+
using arrow::io::BufferReader;
39+
40+
namespace parquet::arrow {
41+
42+
TEST(ParquetRewriterTest, ConcatRoundTrip) {
43+
auto rewriter_properties =
44+
RewriterProperties::Builder()
45+
.writer_properties(
46+
WriterProperties::Builder().enable_write_page_index()->build())
47+
->build();
48+
49+
auto schema = ::arrow::schema(
50+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
51+
52+
std::shared_ptr<Buffer> buffer_up;
53+
std::shared_ptr<Buffer> buffer_down;
54+
55+
WriteFile(rewriter_properties->writer_properties(),
56+
::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"]])"}), buffer_up);
57+
WriteFile(rewriter_properties->writer_properties(),
58+
::arrow::TableFromJSON(schema, {R"([[3, "c"]])"}), buffer_down);
59+
60+
auto sink = CreateOutputStream();
61+
auto rewriter =
62+
ParquetFileRewriter::Open({{std::make_shared<BufferReader>(buffer_up),
63+
std::make_shared<BufferReader>(buffer_down)}},
64+
sink, {{NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties);
65+
rewriter->Rewrite();
66+
rewriter->Close();
67+
68+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> out_buffer, sink->Finish());
69+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
70+
std::unique_ptr<FileReader> reader;
71+
ASSERT_OK_NO_THROW(
72+
FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader));
73+
74+
std::shared_ptr<Table> table;
75+
ASSERT_OK(reader->ReadTable(&table));
76+
ASSERT_OK(table->ValidateFull());
77+
78+
auto expected_table =
79+
::arrow::TableFromJSON(schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"});
80+
AssertTablesEqual(*expected_table, *table);
81+
}
82+
83+
TEST(ParquetRewriterTest, DISABLED_ExtendRoundTrip) {
84+
auto rewriter_properties =
85+
RewriterProperties::Builder()
86+
.writer_properties(
87+
WriterProperties::Builder().enable_write_page_index()->build())
88+
->build();
89+
90+
auto left_schema = ::arrow::schema(
91+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
92+
auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
93+
94+
std::shared_ptr<Buffer> buffer_left;
95+
std::shared_ptr<Buffer> buffer_right;
96+
97+
WriteFile(rewriter_properties->writer_properties(),
98+
::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"], [3, "c"]])"}),
99+
buffer_left);
100+
WriteFile(rewriter_properties->writer_properties(),
101+
::arrow::TableFromJSON(right_schema, {R"([[10], [20], [30]])"}),
102+
buffer_right);
103+
104+
auto sink = CreateOutputStream();
105+
auto rewriter = ParquetFileRewriter::Open(
106+
{{std::make_shared<BufferReader>(buffer_left)},
107+
{std::make_shared<BufferReader>(buffer_right)}},
108+
sink, {{NULLPTR}, {NULLPTR}}, NULLPTR, rewriter_properties);
109+
rewriter->Rewrite();
110+
rewriter->Close();
111+
112+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> out_buffer, sink->Finish());
113+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
114+
std::unique_ptr<FileReader> reader;
115+
ASSERT_OK_NO_THROW(
116+
FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader));
117+
118+
std::shared_ptr<Table> table;
119+
ASSERT_OK(reader->ReadTable(&table));
120+
ASSERT_OK(table->ValidateFull());
121+
122+
auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()),
123+
::arrow::field("b", ::arrow::utf8()),
124+
::arrow::field("c", ::arrow::int64())});
125+
auto expected_table = ::arrow::TableFromJSON(
126+
expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
127+
AssertTablesEqual(*expected_table, *table);
128+
}
129+
130+
TEST(ParquetRewriterTest, DISABLED_SimpleRoundTrip) {
131+
auto rewriter_properties = RewriterProperties::Builder()
132+
.writer_properties(WriterProperties::Builder()
133+
.enable_write_page_index()
134+
->max_row_group_length(1)
135+
->build())
136+
->build();
137+
138+
auto left_schema = ::arrow::schema(
139+
{::arrow::field("a", ::arrow::int32()), ::arrow::field("b", ::arrow::utf8())});
140+
auto right_schema = ::arrow::schema({::arrow::field("c", ::arrow::int64())});
141+
142+
std::shared_ptr<Buffer> buffer_left_up;
143+
std::shared_ptr<Buffer> buffer_left_down;
144+
std::shared_ptr<Buffer> buffer_right_up;
145+
std::shared_ptr<Buffer> buffer_right_down;
146+
147+
WriteFile(rewriter_properties->writer_properties(),
148+
::arrow::TableFromJSON(left_schema, {R"([[1, "a"], [2, "b"]])"}),
149+
buffer_left_up);
150+
WriteFile(rewriter_properties->writer_properties(),
151+
::arrow::TableFromJSON(left_schema, {R"([[3, "c"]])"}), buffer_left_down);
152+
WriteFile(rewriter_properties->writer_properties(),
153+
::arrow::TableFromJSON(right_schema, {R"([[10]])"}), buffer_right_up);
154+
WriteFile(rewriter_properties->writer_properties(),
155+
::arrow::TableFromJSON(right_schema, {R"([[20], [30]])"}), buffer_right_down);
156+
157+
auto sink = CreateOutputStream();
158+
auto rewriter = ParquetFileRewriter::Open(
159+
{{std::make_shared<BufferReader>(buffer_left_up),
160+
std::make_shared<BufferReader>(buffer_left_down)},
161+
{std::make_shared<BufferReader>(buffer_right_up),
162+
std::make_shared<BufferReader>(buffer_right_down)}},
163+
sink, {{NULLPTR, NULLPTR}, {NULLPTR, NULLPTR}}, NULLPTR, rewriter_properties);
164+
rewriter->Rewrite();
165+
rewriter->Close();
166+
167+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> out_buffer, sink->Finish());
168+
auto file_reader = ParquetFileReader::Open(std::make_shared<BufferReader>(out_buffer));
169+
std::unique_ptr<FileReader> reader;
170+
ASSERT_OK_NO_THROW(
171+
FileReader::Make(::arrow::default_memory_pool(), std::move(file_reader), &reader));
172+
173+
std::shared_ptr<Table> table;
174+
ASSERT_OK(reader->ReadTable(&table));
175+
ASSERT_OK(table->ValidateFull());
176+
177+
auto expected_schema = ::arrow::schema({::arrow::field("a", ::arrow::int32()),
178+
::arrow::field("b", ::arrow::utf8()),
179+
::arrow::field("c", ::arrow::int64())});
180+
auto expected_table = ::arrow::TableFromJSON(
181+
expected_schema, {R"([[1, "a", 10], [2, "b", 20], [3, "c", 30]])"});
182+
AssertTablesEqual(*expected_table, *table);
183+
}
184+
185+
} // namespace parquet::arrow

cpp/src/parquet/arrow/test_util.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,23 @@
2828
#include "arrow/array/builder_binary.h"
2929
#include "arrow/array/builder_decimal.h"
3030
#include "arrow/array/builder_primitive.h"
31+
#include "arrow/table.h"
3132
#include "arrow/testing/gtest_util.h"
3233
#include "arrow/testing/random.h"
3334
#include "arrow/type_fwd.h"
3435
#include "arrow/type_traits.h"
3536
#include "arrow/util/decimal.h"
3637
#include "arrow/util/float16.h"
38+
#include "parquet/arrow/schema.h"
39+
#include "parquet/arrow/writer.h"
3740
#include "parquet/column_reader.h"
41+
#include "parquet/file_writer.h"
3842
#include "parquet/test_util.h"
3943

4044
namespace parquet {
4145

4246
using internal::RecordReader;
47+
using schema::GroupNode;
4348

4449
namespace arrow {
4550

@@ -482,6 +487,29 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
482487
EXPECT_TRUE(result->Equals(*expected_array));
483488
}
484489

490+
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
491+
const std::shared_ptr<::arrow::Table>& table,
492+
std::shared_ptr<Buffer>& buffer) {
493+
// Get schema from table.
494+
auto schema = table->schema();
495+
std::shared_ptr<SchemaDescriptor> parquet_schema;
496+
auto arrow_writer_properties = default_arrow_writer_properties();
497+
ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
498+
*arrow_writer_properties, &parquet_schema));
499+
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
500+
501+
// Write table to buffer.
502+
auto sink = CreateOutputStream();
503+
auto pool = ::arrow::default_memory_pool();
504+
auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties);
505+
std::unique_ptr<FileWriter> arrow_writer;
506+
ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties,
507+
&arrow_writer));
508+
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
509+
ASSERT_OK_NO_THROW(arrow_writer->Close());
510+
ASSERT_OK_AND_ASSIGN(buffer, sink->Finish());
511+
}
512+
485513
} // namespace arrow
486514

487515
} // namespace parquet

0 commit comments

Comments
 (0)