Skip to content

Commit

Permalink
Merge pull request #23683 from jcipar/jcipar/batching-writer-factory
Browse files Browse the repository at this point in the history
iceberg: add batching parquet writer factory
  • Loading branch information
ivotron authored Oct 11, 2024
2 parents f0bfb1f + 06e20c6 commit 7ff0704
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 48 deletions.
64 changes: 61 additions & 3 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

#include "datalake/batching_parquet_writer.h"

#include "base/vlog.h"
#include "bytes/iostream.h"
#include "datalake/arrow_translator.h"
#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"

#include <seastar/core/file-types.hh>
#include <seastar/core/fstream.hh>
Expand All @@ -22,6 +24,9 @@
#include <seastar/coroutine/as_future.hh>

#include <cstdint>
#include <exception>
#include <memory>
#include <utility>

namespace datalake {

Expand All @@ -43,20 +48,30 @@ batching_parquet_writer::initialize(std::filesystem::path output_file_path) {
ss::open_flags::create | ss::open_flags::truncate
| ss::open_flags::wo);
} catch (...) {
vlog(
datalake_log.error,
"Error opening output file {}: {}",
_output_file_path,
std::current_exception());
co_return data_writer_error::file_io_error;
}
bool error = false;
try {
_output_stream = co_await ss::make_file_output_stream(_output_file);
} catch (...) {
vlog(
datalake_log.error,
"Error making output stream for file {}: {}",
_output_file_path,
std::current_exception());
error = true;
}
if (error) {
co_await _output_file.close();
co_return data_writer_error::file_io_error;
}

_result.file_path = _output_file_path.string();
_result.remote_path = _output_file_path.string();
co_return data_writer_error::ok;
}

Expand All @@ -66,6 +81,10 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
try {
_iceberg_to_arrow.add_data(std::move(data));
} catch (...) {
vlog(
datalake_log.error,
"Error adding data value to Arrow table: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -83,7 +102,7 @@ ss::future<data_writer_error> batching_parquet_writer::add_data_struct(
co_return data_writer_error::ok;
}

ss::future<result<data_writer_result, data_writer_error>>
ss::future<result<coordinator::data_file, data_writer_error>>
batching_parquet_writer::finish() {
auto write_result = co_await write_row_group();
if (write_result != data_writer_error::ok) {
Expand All @@ -96,6 +115,10 @@ batching_parquet_writer::finish() {
out = _arrow_to_iobuf.close_and_take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
vlog(
datalake_log.error,
"Error closing arrow_to_iobuf stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -107,6 +130,10 @@ batching_parquet_writer::finish() {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
co_await _output_stream.close();
} catch (...) {
vlog(
datalake_log.error,
"Error closing output stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -126,13 +153,17 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
iobuf out;
try {
auto chunk = _iceberg_to_arrow.take_chunk();
_result.record_count += _row_count;
_result.row_count += _row_count;
_row_count = 0;
_byte_count = 0;
_arrow_to_iobuf.add_arrow_array(chunk);
out = _arrow_to_iobuf.take_iobuf();
_result.file_size_bytes += out.size_bytes();
} catch (...) {
vlog(
datalake_log.error,
"Error converting Arrow to Parquet iobuf: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -142,6 +173,10 @@ ss::future<data_writer_error> batching_parquet_writer::write_row_group() {
try {
co_await write_iobuf_to_output_stream(std::move(out), _output_stream);
} catch (...) {
vlog(
datalake_log.error,
"Error writing to output stream: {}",
std::current_exception());
error = true;
}
if (error) {
Expand All @@ -159,4 +194,27 @@ ss::future<> batching_parquet_writer::abort() {
}
}

batching_parquet_writer_factory::batching_parquet_writer_factory(
std::filesystem::path local_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold)
: _local_directory{std::move(local_directory)}
, _file_name_prefix{std::move(file_name_prefix)}
, _row_count_threshold{row_count_threshold}
, _byte_count_treshold{byte_count_threshold} {}

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
auto ret = ss::make_shared<batching_parquet_writer>(
std::move(schema), _row_count_threshold, _byte_count_treshold);
std::string filename = fmt::format(
"{}-{}.parquet", _file_name_prefix, uuid_t::create());
std::filesystem::path file_path = _local_directory / filename;
auto err = co_await ret->initialize(file_path);
if (err != data_writer_error::ok) {
co_return err;
}
co_return ret;
}
} // namespace datalake
24 changes: 22 additions & 2 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <base/seastarx.h>

#include <filesystem>
#include <memory>

namespace datalake {
// batching_parquet_writer ties together the low-level components for iceberg to
Expand Down Expand Up @@ -49,7 +50,8 @@ class batching_parquet_writer : public data_writer {
ss::future<data_writer_error>
add_data_struct(iceberg::struct_value data, int64_t approx_size) override;

ss::future<result<data_writer_result, data_writer_error>> finish() override;
ss::future<result<coordinator::data_file, data_writer_error>>
finish() override;

// Close the file handle, delete any temporary data and clean up any other
// state.
Expand All @@ -72,7 +74,25 @@ class batching_parquet_writer : public data_writer {
std::filesystem::path _output_file_path;
ss::file _output_file;
ss::output_stream<char> _output_stream;
data_writer_result _result;
coordinator::data_file _result;
};

class batching_parquet_writer_factory : public data_writer_factory {
public:
batching_parquet_writer_factory(
std::filesystem::path local_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override;

private:
std::filesystem::path _local_directory;
ss::sstring _file_name_prefix;
size_t _row_count_threshold;
size_t _byte_count_treshold;
};

} // namespace datalake
21 changes: 6 additions & 15 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "base/outcome.h"
#include "coordinator/data_file.h"
#include "datalake/schemaless_translator.h"
#include "iceberg/datatypes.h"
#include "iceberg/values.h"
Expand All @@ -23,19 +24,7 @@ enum class data_writer_error {
ok = 0,
parquet_conversion_error,
file_io_error,

};

struct data_writer_result
: serde::envelope<
data_writer_result,
serde::version<0>,
serde::compat_version<0>> {
ss::sstring file_path = "";
size_t record_count = 0;
size_t file_size_bytes = 0;

auto serde_fields() { return std::tie(record_count); }
no_data,
};

struct data_writer_error_category : std::error_category {
Expand All @@ -49,6 +38,8 @@ struct data_writer_error_category : std::error_category {
return "Parquet Conversion Error";
case data_writer_error::file_io_error:
return "File IO Error";
case data_writer_error::no_data:
return "No data";
}
}

Expand All @@ -70,15 +61,15 @@ class data_writer {
iceberg::struct_value /* data */, int64_t /* approx_size */)
= 0;

virtual ss::future<result<data_writer_result, data_writer_error>>
virtual ss::future<result<coordinator::data_file, data_writer_error>>
finish() = 0;
};

class data_writer_factory {
public:
virtual ~data_writer_factory() = default;

virtual std::unique_ptr<data_writer>
virtual ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type /* schema */) = 0;
};

Expand Down
59 changes: 44 additions & 15 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "datalake/record_multiplexer.h"

#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"
#include "datalake/schemaless_translator.h"
#include "datalake/tests/test_data_writer.h"
#include "iceberg/values.h"
#include "model/record.h"
#include "storage/parser_utils.h"
Expand Down Expand Up @@ -44,54 +46,81 @@ record_multiplexer::operator()(model::record_batch batch) {
+ record.offset_delta();
int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// TODO: we want to ensure we're using an offset translating reader so
// that these will be Kafka offsets, not Raft offsets.
if (!_result.has_value()) {
_result = coordinator::translated_offset_range{};
_result.value().start_offset = kafka::offset(offset);
}
if (offset < _result.value().start_offset()) {
_result.value().start_offset = kafka::offset(offset);
}
if (offset > _result.value().start_offset()) {
_result.value().last_offset = kafka::offset(offset);
}

// Translate the record
auto& translator = get_translator();
iceberg::struct_value data = translator.translate_event(
std::move(key), std::move(val), timestamp, offset);
// Send it to the writer
auto& writer = get_writer();
data_writer_error writer_status = co_await writer.add_data_struct(

auto writer_result = co_await get_writer();
if (!writer_result.has_value()) {
_writer_status = writer_result.error();
co_return ss::stop_iteration::yes;
}
auto writer = std::move(writer_result.value());
_writer_status = co_await writer->add_data_struct(
std::move(data), estimated_size);
if (writer_status != data_writer_error::ok) {
if (_writer_status != data_writer_error::ok) {
// If a write fails, the writer is left in an indeterminate state,
// we cannot continue in this case.
_writer_status = writer_status;
co_return ss::stop_iteration::yes;
}
}
co_return ss::stop_iteration::no;
}

ss::future<result<chunked_vector<data_writer_result>, data_writer_error>>
ss::future<result<coordinator::translated_offset_range, data_writer_error>>
record_multiplexer::end_of_stream() {
if (_writer_status != data_writer_error::ok) {
co_return _writer_status;
}
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
chunked_vector<data_writer_result> ret;
auto res = co_await _writer->finish();
if (res.has_value()) {
ret.push_back(res.value());
co_return ret;
if (!_result.has_value()) {
co_return data_writer_error::no_data;
}
auto result_files = co_await _writer->finish();
if (result_files.has_value()) {
_result.value().files.push_back(result_files.value());
co_return std::move(_result.value());
} else {
co_return res.error();
co_return result_files.error();
}
} else {
co_return chunked_vector<data_writer_result>{};
co_return data_writer_error::no_data;
}
}

schemaless_translator& record_multiplexer::get_translator() {
return _translator;
}

data_writer& record_multiplexer::get_writer() {
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
auto schema = translator.get_schema();
_writer = _writer_factory->create_writer(std::move(schema));
auto writer_result = co_await _writer_factory->create_writer(
std::move(schema));
if (!writer_result.has_value()) {
co_return writer_result.error();
}
_writer = writer_result.value();
co_return _writer;
}
return *_writer;
co_return _writer;
}
} // namespace datalake
Loading

0 comments on commit 7ff0704

Please sign in to comment.