Skip to content

Commit

Permalink
datalake: multiplexer handles failure to create writer
Browse files Browse the repository at this point in the history
Previously, a failure to create a data writer was handled through a try/
catch. This changes that to a result type, since that's our preferred
error handling for the higher-level parts of the code. This requires
changing the type for the writer from std::unique_ptr to ss::shared_ptr
so it can be returned in a result (previously it was returned by
reference).
  • Loading branch information
jcipar committed Oct 9, 2024
1 parent 56a10d9 commit 79062fb
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 20 deletions.
4 changes: 2 additions & 2 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ batching_parquet_writer_factory::batching_parquet_writer_factory(
, _row_count_threshold{row_count_threshold}
, _byte_count_treshold{byte_count_threshold} {}

ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
auto ret = std::make_unique<batching_parquet_writer>(
auto ret = ss::make_shared<batching_parquet_writer>(
std::move(schema), _row_count_threshold, _byte_count_treshold);
std::string filename = fmt::format("{}.parquet", uuid_t::create());
std::filesystem::path file_path = _local_directory / filename;
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class batching_parquet_writer_factory : public data_writer_factory {
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class data_writer_factory {
public:
virtual ~data_writer_factory() = default;

virtual ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
virtual ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type /* schema */) = 0;
};

Expand Down
27 changes: 15 additions & 12 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ record_multiplexer::operator()(model::record_batch batch) {
std::move(key), std::move(val), timestamp, offset);
// Send it to the writer

try {
auto& writer = co_await get_writer();
_writer_status = co_await writer.add_data_struct(
std::move(data), estimated_size);
} catch (const std::runtime_error& err) {
datalake_log.error("Failed to add data to writer");
auto writer_result = co_await get_writer();
if (!writer_result.has_value()) {
_writer_status = writer_result.error();
co_return ss::stop_iteration::yes;
}
auto writer = std::move(writer_result.value());
_writer_status = co_await writer->add_data_struct(
std::move(data), estimated_size);
if (_writer_status != data_writer_error::ok) {
// If a write fails, the writer is left in an indeterminate state,
// we cannot continue in this case.
Expand Down Expand Up @@ -92,17 +93,19 @@ schemaless_translator& record_multiplexer::get_translator() {
return _translator;
}

ss::future<data_writer&> record_multiplexer::get_writer() {
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
auto schema = translator.get_schema();
auto writer_result = co_await _writer_factory->create_writer(std::move(schema));
auto writer_result = co_await _writer_factory->create_writer(
std::move(schema));
if (!writer_result.has_value()) {
// FIXME: handle this error correctly
throw std::runtime_error("Could not create data writer");
co_return writer_result.error();
}
_writer = std::move(writer_result.value());
_writer = writer_result.value();
co_return _writer;
}
co_return *_writer;
co_return _writer;
}
} // namespace datalake
5 changes: 3 additions & 2 deletions src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ class record_multiplexer {

private:
schemaless_translator& get_translator();
ss::future<data_writer&> get_writer();
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
get_writer();

// TODO: in a future PR this will be a map of translators keyed by schema_id
schemaless_translator _translator;
std::unique_ptr<data_writer_factory> _writer_factory;

// TODO: similarly this will be a map keyed by schema_id
std::unique_ptr<data_writer> _writer;
ss::shared_ptr<data_writer> _writer;

data_writer_error _writer_status = data_writer_error::ok;
};
Expand Down
4 changes: 2 additions & 2 deletions src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class test_data_writer_factory : public data_writer_factory {
explicit test_data_writer_factory(bool return_error)
: _return_error{return_error} {}

ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
create_writer(iceberg::struct_type schema) override {
co_return std::make_unique<test_data_writer>(
co_return ss::make_shared<test_data_writer>(
std::move(schema), _return_error);
}

Expand Down

0 comments on commit 79062fb

Please sign in to comment.