Skip to content

Commit ddcc97a

Browse files
authored
Merge pull request #27711 from mmaslankaprv/cl-sg-follow-up
Use shadow linking scheduling group in replicator and topic reconciler
2 parents 0f6d77c + 7d64da0 commit ddcc97a

File tree

7 files changed

+25
-8
lines changed

7 files changed

+25
-8
lines changed

src/v/cluster_link/manager.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,8 @@ ss::future<> manager::start_topic_reconciler() {
585585
_topic_metadata_cache.get(),
586586
_registry.get(),
587587
topic_reconciler_interval,
588-
_default_topic_replication);
588+
_default_topic_replication,
589+
_scheduling_group);
589590
}
590591
try {
591592
co_await _topic_reconciler->start();

src/v/cluster_link/replication/link_replication_mgr.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ ss::future<> link_replication_manager::do_start_replicator(
6161
auto source = _source_factory->make_source(ntp);
6262
auto sink = _sink_factory->make_sink(ntp);
6363
auto replicator = std::make_unique<partition_replicator>(
64-
ntp, term, std::move(source), std::move(sink));
64+
ntp, term, std::move(source), std::move(sink), _sg);
6565
auto [r_it, _] = _replicators.emplace(ntp, std::move(replicator));
6666
co_await r_it->second->start();
6767
}

src/v/cluster_link/replication/partition_replicator.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "cluster_link/logger.h"
1414
#include "ssx/future-util.h"
1515

16+
#include <seastar/coroutine/switch_to.hh>
17+
1618
namespace cluster_link::replication {
1719

1820
static constexpr std::chrono::seconds base_backoff{1};
@@ -22,16 +24,19 @@ partition_replicator::partition_replicator(
2224
const model::ntp& ntp,
2325
model::term_id term,
2426
std::unique_ptr<data_source> source,
25-
std::unique_ptr<data_sink> sink)
27+
std::unique_ptr<data_sink> sink,
28+
ss::scheduling_group scheduling_group)
2629
: _term(term)
2730
, _log(cllog, fmt::format("[{}-term-{}] replicator", ntp, term))
2831
, _source(std::move(source))
2932
, _sink(std::move(sink))
33+
, _scheduling_group(scheduling_group)
3034
, _backoff_policy(
3135
make_exponential_backoff_policy<ss::lowres_clock>(
3236
base_backoff, max_backoff)) {}
3337

3438
ss::future<> partition_replicator::start() {
39+
co_await ss::coroutine::switch_to(_scheduling_group);
3540
vlog(_log.trace, "Starting replicator");
3641
co_await _sink->start();
3742
co_await _source->start(
@@ -48,6 +53,7 @@ ss::future<> partition_replicator::start() {
4853
}
4954

5055
ss::future<> partition_replicator::stop() {
56+
co_await ss::coroutine::switch_to(_scheduling_group);
5157
vlog(_log.trace, "Stopping replicator");
5258
_as.request_abort();
5359
// closing the gate first ensures all the units are returned to the

src/v/cluster_link/replication/partition_replicator.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ class partition_replicator {
5858
const ::model::ntp& ntp,
5959
::model::term_id,
6060
std::unique_ptr<data_source> source,
61-
std::unique_ptr<data_sink> sink);
61+
std::unique_ptr<data_sink> sink,
62+
ss::scheduling_group sg = ss::default_scheduling_group());
6263
ss::future<> start();
6364
ss::future<> stop();
6465

@@ -88,6 +89,7 @@ class partition_replicator {
8889
ss::abort_source _as;
8990
std::unique_ptr<data_source> _source;
9091
std::unique_ptr<data_sink> _sink;
92+
ss::scheduling_group _scheduling_group;
9193
// to pipeline multiple replicate requests in parallel
9294
static constexpr ssize_t max_in_flight_requests = 5;
9395
ssx::semaphore _max_requests{

src/v/cluster_link/tests/topic_reconciler_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class topic_reconciler_test : public seastar_test {
5454
_ftmc.get(),
5555
_link_registry.get(),
5656
1s,
57-
_default_topic_replication.bind());
57+
_default_topic_replication.bind(),
58+
ss::default_scheduling_group());
5859
co_await _reconciler->start();
5960

6061
set_required_topic_properties(

src/v/cluster_link/topic_reconciler.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "model/fundamental.h"
1919
#include "ssx/future-util.h"
2020

21+
#include <seastar/coroutine/switch_to.hh>
22+
2123
#include <exception>
2224

2325
using namespace std::chrono_literals;
@@ -28,12 +30,14 @@ topic_reconciler::topic_reconciler(
2830
kafka::data::rpc::topic_metadata_cache* topic_metadata_cache,
2931
link_registry* link_registry,
3032
ss::lowres_clock::duration run_interval,
31-
config::binding<int16_t> default_topic_replication)
33+
config::binding<int16_t> default_topic_replication,
34+
ss::scheduling_group scheduling_group)
3235
: _topic_creator(topic_creator)
3336
, _topic_metadata_cache(topic_metadata_cache)
3437
, _link_registry(link_registry)
3538
, _run_interval(run_interval)
36-
, _default_topic_replication(std::move(default_topic_replication)) {}
39+
, _default_topic_replication(std::move(default_topic_replication))
40+
, _scheduling_group(scheduling_group) {}
3741

3842
ss::future<> topic_reconciler::start() {
3943
vlog(cllog.info, "Starting topic reconciler");
@@ -79,6 +83,7 @@ void topic_reconciler::trigger(model::id_t link_id) {
7983
}
8084

8185
ss::future<> topic_reconciler::execute(std::optional<model::id_t> link_id) {
86+
co_await ss::coroutine::switch_to(_scheduling_group);
8287
static constexpr auto max_concurrent_reconcilations = 5;
8388
vlog(cllog.trace, "Executing topic reconciler, link_id: {}", link_id);
8489
auto units = co_await _reconciler_mutex.get_units(_as);

src/v/cluster_link/topic_reconciler.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class topic_reconciler {
2828
kafka::data::rpc::topic_metadata_cache* topic_metadata_cache,
2929
link_registry* link_registry,
3030
ss::lowres_clock::duration run_interval,
31-
config::binding<int16_t> default_topic_replication);
31+
config::binding<int16_t> default_topic_replication,
32+
ss::scheduling_group sg);
3233
topic_reconciler(const topic_reconciler&) = delete;
3334
topic_reconciler(topic_reconciler&&) = delete;
3435
topic_reconciler& operator=(const topic_reconciler&) = delete;
@@ -79,6 +80,7 @@ class topic_reconciler {
7980
ss::timer<ss::lowres_clock> _reconciler_timer;
8081
ss::lowres_clock::duration _run_interval;
8182
config::binding<int16_t> _default_topic_replication;
83+
ss::scheduling_group _scheduling_group;
8284
ss::abort_source _as;
8385
ss::gate _gate;
8486
};

0 commit comments

Comments
 (0)