Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: add batching parquet writer factory #23683

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ batching_parquet_writer::initialize(std::filesystem::path output_file_path) {
co_return data_writer_error::file_io_error;
}

_result.file_path = _output_file_path.string();
_result.remote_path = _output_file_path.string();
co_return data_writer_error::ok;
}

Expand Down Expand Up @@ -102,7 +102,7 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
co_return data_writer_error::ok;
}

ss::future<result<data_writer_result, data_writer_error>>
ss::future<result<coordinator::data_file, data_writer_error>>
batching_parquet_writer::finish() {
auto write_result = co_await write_row_group();
if (write_result != data_writer_error::ok) {
Expand Down Expand Up @@ -153,7 +153,7 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
iobuf out;
try {
auto chunk = _iceberg_to_arrow.take_chunk();
_result.record_count += _row_count;
_result.row_count += _row_count;
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class batching_parquet_writer : public data_writer {
ss::future<data_writer_error>
add_data_struct(iceberg::struct_value data, int64_t approx_size) override;

ss::future<result<data_writer_result, data_writer_error>> finish() override;
ss::future<result<coordinator::data_file, data_writer_error>> finish() override;

// Close the file handle, delete any temporary data and clean up any other
// state.
Expand All @@ -73,7 +73,7 @@ class batching_parquet_writer : public data_writer {
std::filesystem::path _output_file_path;
ss::file _output_file;
ss::output_stream<char> _output_stream;
data_writer_result _result;
coordinator::data_file _result;
};

class batching_parquet_writer_factory : public data_writer_factory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: could have also made this a nested class so it would be batching_parquet_writer::factory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the number of things stacked above this, and how pervasive a change that would be, I'd rather make it a separate PR.

Expand Down
14 changes: 2 additions & 12 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "base/outcome.h"
#include "coordinator/data_file.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datalake/coordinator/data_file.h

#include "datalake/schemaless_translator.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"
Expand All @@ -25,17 +26,6 @@ enum class data_writer_error {
file_io_error,
};

struct data_writer_result
: serde::envelope<
data_writer_result,
serde::version<0>,
serde::compat_version<0>> {
ss::sstring file_path = "";
size_t record_count = 0;
size_t file_size_bytes = 0;

auto serde_fields() { return std::tie(record_count); }
};

struct data_writer_error_category : std::error_category {
const char* name() const noexcept override { return "Data Writer Error"; }
Expand Down Expand Up @@ -69,7 +59,7 @@ class data_writer {
iceberg::struct_value /* data */, int64_t /* approx_size */)
= 0;

virtual ss::future<result<data_writer_result, data_writer_error>>
virtual ss::future<result<coordinator::data_file, data_writer_error>>
finish() = 0;
};

Expand Down
6 changes: 3 additions & 3 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ record_multiplexer::operator()(model::record_batch batch) {
co_return ss::stop_iteration::no;
}

ss::future<result<chunked_vector<data_writer_result>, data_writer_error>>
ss::future<result<chunked_vector<coordinator::data_file>, data_writer_error>>
record_multiplexer::end_of_stream() {
if (_writer_status != data_writer_error::ok) {
co_return _writer_status;
}
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
chunked_vector<data_writer_result> ret;
chunked_vector<coordinator::data_file> ret;
auto res = co_await _writer->finish();
if (res.has_value()) {
ret.push_back(res.value());
Expand All @@ -85,7 +85,7 @@ record_multiplexer::end_of_stream() {
co_return res.error();
}
} else {
co_return chunked_vector<data_writer_result>{};
co_return chunked_vector<coordinator::data_file>{};
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class record_multiplexer {
explicit record_multiplexer(
std::unique_ptr<data_writer_factory> writer_factory);
ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<result<chunked_vector<data_writer_result>, data_writer_error>>
ss::future<result<chunked_vector<coordinator::data_file>, data_writer_error>>
end_of_stream();

private:
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/tests/batching_parquet_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ TEST(BatchingParquetWriterTest, WritesParquetFiles) {

auto result = writer.finish().get0();
ASSERT_TRUE(result.has_value());
EXPECT_EQ(result.value().file_path, file_path);
EXPECT_EQ(result.value().record_count, num_rows);
EXPECT_EQ(result.value().remote_path, file_path);
EXPECT_EQ(result.value().row_count, num_rows);
auto true_file_size = std::filesystem::file_size(file_path);
EXPECT_EQ(result.value().file_size_bytes, true_file_size);

Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);
EXPECT_EQ(result.value()[0].row_count, record_count * batch_count);
}
TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
int record_count = 10;
Expand Down Expand Up @@ -102,7 +102,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);
EXPECT_EQ(result.value()[0].row_count, record_count * batch_count);

// Open the resulting file and check that it has data in it with the
// appropriate counts.
Expand Down
8 changes: 4 additions & 4 deletions src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ class test_data_writer : public data_writer {

ss::future<data_writer_error> add_data_struct(
iceberg::struct_value /* data */, int64_t /* approx_size */) override {
_result.record_count++;
_result.row_count++;
data_writer_error status
= _return_error ? data_writer_error::parquet_conversion_error
: data_writer_error::ok;
return ss::make_ready_future<data_writer_error>(status);
}

ss::future<result<data_writer_result, data_writer_error>>
ss::future<result<coordinator::data_file, data_writer_error>>
finish() override {
return ss::make_ready_future<
result<data_writer_result, data_writer_error>>(_result);
result<coordinator::data_file, data_writer_error>>(_result);
}

private:
iceberg::struct_type _schema;
data_writer_result _result;
coordinator::data_file _result;
bool _return_error;
};
class test_data_writer_factory : public data_writer_factory {
Expand Down