Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/v/cluster_link/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ ss::future<> manager::start_topic_reconciler() {
_topic_metadata_cache.get(),
_registry.get(),
topic_reconciler_interval,
_default_topic_replication);
_default_topic_replication,
_scheduling_group);
}
try {
co_await _topic_reconciler->start();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster_link/replication/link_replication_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ ss::future<> link_replication_manager::do_start_replicator(
auto source = _source_factory->make_source(ntp);
auto sink = _sink_factory->make_sink(ntp);
auto replicator = std::make_unique<partition_replicator>(
ntp, term, std::move(source), std::move(sink));
ntp, term, std::move(source), std::move(sink), _sg);
auto [r_it, _] = _replicators.emplace(ntp, std::move(replicator));
co_await r_it->second->start();
}
Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster_link/replication/partition_replicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "cluster_link/logger.h"
#include "ssx/future-util.h"

#include <seastar/coroutine/switch_to.hh>

namespace cluster_link::replication {

static constexpr std::chrono::seconds base_backoff{1};
Expand All @@ -22,16 +24,19 @@ partition_replicator::partition_replicator(
const model::ntp& ntp,
model::term_id term,
std::unique_ptr<data_source> source,
std::unique_ptr<data_sink> sink)
std::unique_ptr<data_sink> sink,
ss::scheduling_group scheduling_group)
: _term(term)
, _log(cllog, fmt::format("[{}-term-{}] replicator", ntp, term))
, _source(std::move(source))
, _sink(std::move(sink))
, _scheduling_group(scheduling_group)
, _backoff_policy(
make_exponential_backoff_policy<ss::lowres_clock>(
base_backoff, max_backoff)) {}

ss::future<> partition_replicator::start() {
co_await ss::coroutine::switch_to(_scheduling_group);
vlog(_log.trace, "Starting replicator");
co_await _sink->start();
co_await _source->start(
Expand All @@ -48,6 +53,7 @@ ss::future<> partition_replicator::start() {
}

ss::future<> partition_replicator::stop() {
co_await ss::coroutine::switch_to(_scheduling_group);
vlog(_log.trace, "Stopping replicator");
_as.request_abort();
// closing the gate first ensures all the units are returned to the
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster_link/replication/partition_replicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class partition_replicator {
const ::model::ntp& ntp,
::model::term_id,
std::unique_ptr<data_source> source,
std::unique_ptr<data_sink> sink);
std::unique_ptr<data_sink> sink,
ss::scheduling_group sg = ss::default_scheduling_group());
ss::future<> start();
ss::future<> stop();

Expand Down Expand Up @@ -88,6 +89,7 @@ class partition_replicator {
ss::abort_source _as;
std::unique_ptr<data_source> _source;
std::unique_ptr<data_sink> _sink;
ss::scheduling_group _scheduling_group;
// to pipeline multiple replicate requests in parallel
static constexpr ssize_t max_in_flight_requests = 5;
ssx::semaphore _max_requests{
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster_link/tests/topic_reconciler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class topic_reconciler_test : public seastar_test {
_ftmc.get(),
_link_registry.get(),
1s,
_default_topic_replication.bind());
_default_topic_replication.bind(),
ss::default_scheduling_group());
co_await _reconciler->start();

set_required_topic_properties(
Expand Down
9 changes: 7 additions & 2 deletions src/v/cluster_link/topic_reconciler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "model/fundamental.h"
#include "ssx/future-util.h"

#include <seastar/coroutine/switch_to.hh>

#include <exception>

using namespace std::chrono_literals;
Expand All @@ -28,12 +30,14 @@ topic_reconciler::topic_reconciler(
kafka::data::rpc::topic_metadata_cache* topic_metadata_cache,
link_registry* link_registry,
ss::lowres_clock::duration run_interval,
config::binding<int16_t> default_topic_replication)
config::binding<int16_t> default_topic_replication,
ss::scheduling_group scheduling_group)
: _topic_creator(topic_creator)
, _topic_metadata_cache(topic_metadata_cache)
, _link_registry(link_registry)
, _run_interval(run_interval)
, _default_topic_replication(std::move(default_topic_replication)) {}
, _default_topic_replication(std::move(default_topic_replication))
, _scheduling_group(scheduling_group) {}

ss::future<> topic_reconciler::start() {
vlog(cllog.info, "Starting topic reconciler");
Expand Down Expand Up @@ -79,6 +83,7 @@ void topic_reconciler::trigger(model::id_t link_id) {
}

ss::future<> topic_reconciler::execute(std::optional<model::id_t> link_id) {
co_await ss::coroutine::switch_to(_scheduling_group);
static constexpr auto max_concurrent_reconcilations = 5;
vlog(cllog.trace, "Executing topic reconciler, link_id: {}", link_id);
auto units = co_await _reconciler_mutex.get_units(_as);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster_link/topic_reconciler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class topic_reconciler {
kafka::data::rpc::topic_metadata_cache* topic_metadata_cache,
link_registry* link_registry,
ss::lowres_clock::duration run_interval,
config::binding<int16_t> default_topic_replication);
config::binding<int16_t> default_topic_replication,
ss::scheduling_group sg);
topic_reconciler(const topic_reconciler&) = delete;
topic_reconciler(topic_reconciler&&) = delete;
topic_reconciler& operator=(const topic_reconciler&) = delete;
Expand Down Expand Up @@ -79,6 +80,7 @@ class topic_reconciler {
ss::timer<ss::lowres_clock> _reconciler_timer;
ss::lowres_clock::duration _run_interval;
config::binding<int16_t> _default_topic_replication;
ss::scheduling_group _scheduling_group;
ss::abort_source _as;
ss::gate _gate;
};
Expand Down