Skip to content

Commit 56d88d8

Browse files
committed
datalake: multiplexer handles failure to create writer
Previously, a failure to create a data writer was handled through a try/ catch. This changes that to a result type, since that's our preferred error handling for the higher-level parts of the code. This requires changing the type for the writer from std::unique_ptr to ss::shared_ptr so it can be returned in a result (previously it was returned by reference).
1 parent 9d23521 commit 56d88d8

6 files changed

+24
-21
lines changed

src/v/datalake/batching_parquet_writer.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,9 +186,9 @@ batching_parquet_writer_factory::batching_parquet_writer_factory(
186186
, _row_count_threshold{row_count_threshold}
187187
, _byte_count_treshold{byte_count_threshold} {}
188188

189-
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
189+
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
190190
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
191-
auto ret = std::make_unique<batching_parquet_writer>(
191+
auto ret = ss::make_shared<batching_parquet_writer>(
192192
std::move(schema), _row_count_threshold, _byte_count_treshold);
193193
uuid_t file_uuid = uuid_t::create();
194194
std::stringstream filename_stream;

src/v/datalake/batching_parquet_writer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class batching_parquet_writer_factory : public data_writer_factory {
8383
size_t row_count_threshold,
8484
size_t byte_count_threshold);
8585

86-
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
86+
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
8787
create_writer(iceberg::struct_type schema) override;
8888

8989
private:

src/v/datalake/data_writer_interface.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class data_writer_factory {
7777
public:
7878
virtual ~data_writer_factory() = default;
7979

80-
virtual ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
80+
virtual ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
8181
create_writer(iceberg::struct_type /* schema */) = 0;
8282
};
8383

src/v/datalake/record_multiplexer.cc

+15-13
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ record_multiplexer::operator()(model::record_batch batch) {
5252
std::move(key), std::move(val), timestamp, offset);
5353
// Send it to the writer
5454

55-
try {
56-
auto& writer = co_await get_writer();
57-
_writer_status = co_await writer.add_data_struct(
58-
std::move(data), estimated_size);
59-
} catch (const std::runtime_error& err) {
60-
datalake_log.error("Failed to add data to writer");
61-
55+
auto writer_result = co_await get_writer();
56+
if (!writer_result.has_value()) {
57+
_writer_status = writer_result.error();
58+
co_return ss::stop_iteration::yes;
6259
}
60+
auto& writer = writer_result.value();
61+
_writer_status = co_await writer->add_data_struct(
62+
std::move(data), estimated_size);
6363
if (_writer_status != data_writer_error::ok) {
6464
// If a write fails, the writer is left in an indeterminate state,
6565
// we cannot continue in this case.
@@ -93,17 +93,19 @@ schemaless_translator& record_multiplexer::get_translator() {
9393
return _translator;
9494
}
9595

96-
ss::future<data_writer&> record_multiplexer::get_writer() {
96+
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
97+
record_multiplexer::get_writer() {
9798
if (!_writer) {
9899
auto& translator = get_translator();
99100
auto schema = translator.get_schema();
100-
auto writer_result = co_await _writer_factory->create_writer(std::move(schema));
101+
auto writer_result = co_await _writer_factory->create_writer(
102+
std::move(schema));
101103
if (!writer_result.has_value()) {
102-
// FIXME: handle this error correctly
103-
throw std::runtime_error("Could not create data writer");
104+
co_return writer_result.error();
104105
}
105-
_writer = std::move(writer_result.value());
106+
_writer = writer_result.value();
107+
co_return _writer;
106108
}
107-
co_return *_writer;
109+
co_return _writer;
108110
}
109111
} // namespace datalake

src/v/datalake/record_multiplexer.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ class record_multiplexer {
4747

4848
private:
4949
schemaless_translator& get_translator();
50-
ss::future<data_writer&> get_writer();
50+
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
51+
get_writer();
5152

5253
// TODO: in a future PR this will be a map of translators keyed by schema_id
5354
schemaless_translator _translator;
5455
std::unique_ptr<data_writer_factory> _writer_factory;
5556

5657
// TODO: similarly this will be a map keyed by schema_id
57-
std::unique_ptr<data_writer> _writer;
58+
ss::shared_ptr<data_writer> _writer;
5859

5960
data_writer_error _writer_status = data_writer_error::ok;
6061
};

src/v/datalake/tests/test_data_writer.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ class test_data_writer_factory : public data_writer_factory {
5252
explicit test_data_writer_factory(bool return_error)
5353
: _return_error{return_error} {}
5454

55-
ss::future<result<std::unique_ptr<data_writer>, data_writer_error>>
55+
ss::future<result<ss::shared_ptr<data_writer>, data_writer_error>>
5656
create_writer(iceberg::struct_type schema) override {
57-
co_return std::make_unique<test_data_writer>(
57+
co_return ss::make_shared<test_data_writer>(
5858
std::move(schema), _return_error);
5959
}
6060

0 commit comments

Comments
 (0)