Skip to content

Commit

Permalink
dl/translation: abort translation using task abort source
Browse files Browse the repository at this point in the history
Partition data translation may take long time, passing in abort source
to `datalake::record_multiplexer` will allow us to timely stop the
translation when partition is stopping. The translation result would be
discarded anyway as all cloud IO is already stopped.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Dec 23, 2024
1 parent 38fdff4 commit 15c305b
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ redpanda_cc_library(
":table_creator",
"//src/v/base",
"//src/v/storage:parser_utils",
"//src/v/utils:lazy_abort_source",
"//src/v/utils:prefix_logger",
],
include_prefix = "datalake",
Expand Down
20 changes: 18 additions & 2 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,27 @@ record_multiplexer::record_multiplexer(
schema_manager& schema_mgr,
type_resolver& type_resolver,
record_translator& record_translator,
table_creator& table_creator)
table_creator& table_creator,
lazy_abort_source& as)
: _log(datalake_log, fmt::format("{}", ntp))
, _ntp(ntp)
, _topic_revision(topic_revision)
, _writer_factory{std::move(writer_factory)}
, _schema_mgr(schema_mgr)
, _type_resolver(type_resolver)
, _record_translator(record_translator)
, _table_creator(table_creator) {}
, _table_creator(table_creator)
, _as(as) {}

ss::future<ss::stop_iteration>
record_multiplexer::operator()(model::record_batch batch) {
if (_as.abort_requested()) {
vlog(
_log.debug,
"Abort requested, stopping translation, reason: {}",
_as.abort_reason());
co_return ss::stop_iteration::yes;
}
if (batch.compressed()) {
batch = co_await storage::internal::decompress_batch(std::move(batch));
}
Expand All @@ -50,6 +59,13 @@ record_multiplexer::operator()(model::record_batch batch) {
auto it = model::record_batch_iterator::create(batch);

while (it.has_next()) {
if (_as.abort_requested()) {
vlog(
_log.debug,
"Abort requested, stopping translation, reason: {}",
_as.abort_reason());
co_return ss::stop_iteration::yes;
}
auto record = it.next();
auto key = record.share_key_opt();
auto val = record.share_value_opt();
Expand Down
5 changes: 4 additions & 1 deletion src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "datalake/partitioning_writer.h"
#include "datalake/schema_identifier.h"
#include "model/record.h"
#include "utils/lazy_abort_source.h"
#include "utils/prefix_logger.h"

#include <seastar/core/future.hh>
Expand Down Expand Up @@ -47,7 +48,8 @@ class record_multiplexer {
schema_manager& schema_mgr,
type_resolver& type_resolver,
record_translator& record_translator,
table_creator&);
table_creator&,
lazy_abort_source& as);

ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<result<write_result, writer_error>> end_of_stream();
Expand All @@ -71,6 +73,7 @@ class record_multiplexer {
type_resolver& _type_resolver;
record_translator& _record_translator;
table_creator& _table_creator;
lazy_abort_source& _as;
chunked_hash_map<
record_schema_components,
std::unique_ptr<partitioning_writer>>
Expand Down
10 changes: 7 additions & 3 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const model::ntp
ntp(model::ns{"rp"}, model::topic{"t"}, model::partition_id{0});
const model::revision_id rev{123};
default_translator translator;
lazy_abort_source as([] { return std::nullopt; });
} // namespace

TEST(DatalakeMultiplexerTest, TestMultiplexer) {
Expand All @@ -53,7 +54,8 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {
simple_schema_mgr,
bin_resolver,
translator,
t_creator);
t_creator,
as);

model::test::record_batch_spec batch_spec;
batch_spec.records = record_count;
Expand Down Expand Up @@ -93,7 +95,8 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
simple_schema_mgr,
bin_resolver,
translator,
t_creator);
t_creator,
as);

model::test::record_batch_spec batch_spec;
batch_spec.records = record_count;
Expand Down Expand Up @@ -251,7 +254,8 @@ TEST_F(RecordMultiplexerParquetTest, TestSimple) {
schema_mgr,
type_resolver,
translator,
t_creator);
t_creator,
as);
auto res = reader.consume(std::move(mux), model::no_timeout).get();
ASSERT_FALSE(res.has_error()) << res.error();
EXPECT_EQ(res.value().start_offset(), start_offset());
Expand Down
7 changes: 5 additions & 2 deletions src/v/datalake/tests/record_multiplexer_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ class record_multiplexer_bench_fixture
: _schema_mgr(catalog)
, _type_resolver(registry)
, _record_gen(&registry)
, _table_creator(_type_resolver, _schema_mgr) {}
, _table_creator(_type_resolver, _schema_mgr)
, _as([] { return std::nullopt; }) {}

template<typename T>
requires std::same_as<T, ::testing::protobuf_generator_config>
Expand Down Expand Up @@ -314,6 +315,7 @@ class record_multiplexer_bench_fixture
datalake::default_translator _translator;
datalake::direct_table_creator _table_creator;
chunked_vector<model::record_batch> _batch_data;
lazy_abort_source _as;

const model::ntp ntp{
model::ns{"rp"}, model::topic{"t"}, model::partition_id{0}};
Expand All @@ -327,7 +329,8 @@ class record_multiplexer_bench_fixture
_schema_mgr,
_type_resolver,
_translator,
_table_creator);
_table_creator,
_as);
}

ss::future<>
Expand Down
3 changes: 2 additions & 1 deletion src/v/datalake/translation_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ translation_task::translate(
*_schema_mgr,
*_type_resolver,
*_record_translator,
*_table_creator);
*_table_creator,
lazy_as);
// Write local files
auto mux_result = co_await std::move(reader).consume(
std::move(mux), _read_timeout + model::timeout_clock::now());
Expand Down

0 comments on commit 15c305b

Please sign in to comment.