Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: add batching parquet writer factory #23683

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

#include "datalake/batching_parquet_writer.h"

#include "base/vlog.h"
#include "bytes/iostream.h"
#include "datalake/arrow_translator.h"
#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"

#include <seastar/core/file-types.hh>
#include <seastar/core/fstream.hh>
Expand All @@ -22,6 +24,9 @@
#include <seastar/coroutine/as_future.hh>

#include <cstdint>
#include <exception>
#include <memory>
#include <utility>

namespace datalake {

Expand All @@ -43,20 +48,30 @@ batching_parquet_writer::initialize(std::filesystem::path output_file_path) {
ss::open_flags::create | ss::open_flags::truncate
| ss::open_flags::wo);
} catch (...) {
vlog(
datalake_log.error,
"Error opening output file {}: {}",
_output_file_path,
std::current_exception());
co_return data_writer_error::file_io_error;
}
bool error = false;
try {
_output_stream = co_await ss::make_file_output_stream(_output_file);
} catch (...) {
vlog(
datalake_log.error,
"Error making output stream for file {}: {}",
_output_file_path,
std::current_exception());
error = true;
}
if (error) {
co_await _output_file.close();
co_return data_writer_error::file_io_error;
}

_result.file_path = _output_file_path.string();
_result.remote_path = _output_file_path.string();
co_return data_writer_error::ok;
}

Expand All @@ -66,6 +81,10 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
try {
_iceberg_to_arrow.add_data(std::move(data));
} catch (...) {
vlog(
datalake_log.error,
"Error adding data value to Arrow table: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -83,7 +102,7 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
co_return data_writer_error::ok;
}

ss::future<result<data_writer_result, data_writer_error>>
ss::future<result<coordinator::data_file, data_writer_error>>
batching_parquet_writer::finish() {
auto write_result = co_await write_row_group();
if (write_result != data_writer_error::ok) {
Expand All @@ -96,6 +115,10 @@ batching_parquet_writer::finish() {
out = _arrow_to_iobuf.close_and_take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
vlog(
datalake_log.error,
"Error closing arrow_to_iobuf stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -107,6 +130,10 @@ batching_parquet_writer::finish() {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
co_await _output_stream.close();
} catch (...) {
vlog(
datalake_log.error,
"Error closing output stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -126,13 +153,17 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
iobuf out;
try {
auto chunk = _iceberg_to_arrow.take_chunk();
_result.record_count += _row_count;
_result.row_count += _row_count;
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
out = _arrow_to_iobuf.take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
vlog(
datalake_log.error,
"Error converting Arrow to Parquet iobuf: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -142,6 +173,10 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
try {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
} catch (...) {
vlog(
datalake_log.error,
"Error writing to output stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -159,4 +194,27 @@ ss::future<> batching_parquet_writer::abort() {
}
}

batching_parquet_writer_factory::batching_parquet_writer_factory(
std::filesystem::path local_directory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architecturally, I believe it will be simpler to keep these parquet files in memory. AFAIK we're going to load these fully into memory already to send them, and it's fairly straightforward to have a semaphore to make sure we stay within the subsystem's memory budget. Writing to disk, especially with our trends of smaller and smaller disks, is going to come with a number of challenges: cleaning up zombie files, integrating with space management, etc.

Now we can do this after the beta phase, but we should keep this in mind as we're structuring the codepaths.

However

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we do want to stream directly to s3 eventually. I'd rather stick with this for now so we can get an end-to-end test asap.

FWIW, we can stream from disk to S3, we don't need to load it all into memory first. cloud_storage::remote::upload_controller_snapshot is an example of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_directory

todo for future: As we chatted offline, this is internal to the writer, so it should self manage this path and its lifecycle

ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold)
: _local_directory{std::move(local_directory)}
, _file_name_prefix{std::move(file_name_prefix)}
, _row_count_threshold{row_count_threshold}
, _byte_count_treshold{byte_count_threshold} {}

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
auto ret = ss::make_shared<batching_parquet_writer>(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
std::move(schema), _row_count_threshold, _byte_count_treshold);
std::string filename = fmt::format(
"{}-{}.parquet", _file_name_prefix, uuid_t::create());
std::filesystem::path file_path = _local_directory / filename;
auto err = co_await ret->initialize(file_path);
if (err != data_writer_error::ok) {
co_return err;
}
co_return ret;
}
} // namespace datalake
24 changes: 22 additions & 2 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <base/seastarx.h>

#include <filesystem>
#include <memory>

namespace datalake {
// batching_parquet_writer ties together the low-level components for iceberg to
Expand Down Expand Up @@ -49,7 +50,8 @@ class batching_parquet_writer : public data_writer {
ss::future<data_writer_error>
add_data_struct(iceberg::struct_value data, int64_t approx_size) override;

ss::future<result<data_writer_result, data_writer_error>> finish() override;
ss::future<result<coordinator::data_file, data_writer_error>>
finish() override;

// Close the file handle, delete any temporary data and clean up any other
// state.
Expand All @@ -72,7 +74,25 @@ class batching_parquet_writer : public data_writer {
std::filesystem::path _output_file_path;
ss::file _output_file;
ss::output_stream<char> _output_stream;
data_writer_result _result;
coordinator::data_file _result;
};

class batching_parquet_writer_factory : public data_writer_factory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: could have also made this a nested class so it would be batching_parquet_writer::factory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the number of things stacked above this, and how pervasive a change that would be, I'd rather make it a separate PR.

public:
batching_parquet_writer_factory(
std::filesystem::path local_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override;

private:
std::filesystem::path _local_directory;
ss::sstring _file_name_prefix;
size_t _row_count_threshold;
size_t _byte_count_treshold;
};

} // namespace datalake
21 changes: 6 additions & 15 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "base/outcome.h"
#include "coordinator/data_file.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datalake/coordinator/data_file.h

#include "datalake/schemaless_translator.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"
Expand All @@ -23,19 +24,7 @@ enum class data_writer_error {
ok = 0,
parquet_conversion_error,
file_io_error,

};

struct data_writer_result
: serde::envelope<
data_writer_result,
serde::version<0>,
serde::compat_version<0>> {
ss::sstring file_path = "";
size_t record_count = 0;
size_t file_size_bytes = 0;

auto serde_fields() { return std::tie(record_count); }
no_data,
};

struct data_writer_error_category : std::error_category {
Expand All @@ -49,6 +38,8 @@ struct data_writer_error_category : std::error_category {
return "Parquet Conversion Error";
case data_writer_error::file_io_error:
return "File IO Error";
case data_writer_error::no_data:
return "No data";
}
}

Expand All @@ -70,15 +61,15 @@ class data_writer {
iceberg::struct_value /* data */, int64_t /* approx_size */)
= 0;

virtual ss::future<result<data_writer_result, data_writer_error>>
virtual ss::future<result<coordinator::data_file, data_writer_error>>
finish() = 0;
};

class data_writer_factory {
public:
virtual ~data_writer_factory() = default;

virtual std::unique_ptr<data_writer>
virtual ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type /* schema */) = 0;
};

Expand Down
59 changes: 44 additions & 15 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "datalake/record_multiplexer.h"

#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"
#include "datalake/schemaless_translator.h"
#include "datalake/tests/test_data_writer.h"
#include "iceberg/values.h"
#include "model/record.h"
#include "storage/parser_utils.h"
Expand Down Expand Up @@ -44,54 +46,81 @@ record_multiplexer::operator()(model::record_batch batch) {
+ record.offset_delta();
int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// TODO: we want to ensure we're using an offset translating reader so
// that these will be Kafka offsets, not Raft offsets.
if (!_result.has_value()) {
_result = coordinator::translated_offset_range{};
_result.value().start_offset = kafka::offset(offset);
}
if (offset < _result.value().start_offset()) {
_result.value().start_offset = kafka::offset(offset);
}
if (offset > _result.value().start_offset()) {
_result.value().last_offset = kafka::offset(offset);
}

// Translate the record
auto& translator = get_translator();
iceberg::struct_value data = translator.translate_event(
std::move(key), std::move(val), timestamp, offset);
// Send it to the writer
auto& writer = get_writer();
data_writer_error writer_status = co_await writer.add_data_struct(

auto writer_result = co_await get_writer();
if (!writer_result.has_value()) {
_writer_status = writer_result.error();
co_return ss::stop_iteration::yes;
}
auto writer = std::move(writer_result.value());
_writer_status = co_await writer->add_data_struct(
std::move(data), estimated_size);
if (writer_status != data_writer_error::ok) {
if (_writer_status != data_writer_error::ok) {
// If a write fails, the writer is left in an indeterminate state,
// we cannot continue in this case.
_writer_status = writer_status;
co_return ss::stop_iteration::yes;
}
}
co_return ss::stop_iteration::no;
}

ss::future<result<chunked_vector<data_writer_result>, data_writer_error>>
ss::future<result<coordinator::translated_offset_range, data_writer_error>>
record_multiplexer::end_of_stream() {
if (_writer_status != data_writer_error::ok) {
co_return _writer_status;
}
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
chunked_vector<data_writer_result> ret;
auto res = co_await _writer->finish();
if (res.has_value()) {
ret.push_back(res.value());
co_return ret;
if (!_result.has_value()) {
co_return data_writer_error::no_data;
}
auto result_files = co_await _writer->finish();
if (result_files.has_value()) {
_result.value().files.push_back(result_files.value());
co_return std::move(_result.value());
} else {
co_return res.error();
co_return result_files.error();
}
} else {
co_return chunked_vector<data_writer_result>{};
co_return data_writer_error::no_data;
}
}

schemaless_translator& record_multiplexer::get_translator() {
return _translator;
}

data_writer& record_multiplexer::get_writer() {
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
auto schema = translator.get_schema();
_writer = _writer_factory->create_writer(std::move(schema));
auto writer_result = co_await _writer_factory->create_writer(
std::move(schema));
if (!writer_result.has_value()) {
co_return writer_result.error();
}
_writer = writer_result.value();
co_return _writer;
}
return *_writer;
co_return _writer;
}
} // namespace datalake
Loading