Skip to content

Commit 58e8ddb

Browse files
authored
Merge pull request #27373 from bharathv/dr11
cluster_link/replication: add remote data source and partition sink
2 parents eb42c5b + b87d2b0 commit 58e8ddb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2136
-187
lines changed

src/v/cluster/tests/cluster_test_fixture.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ class cluster_test_fixture {
122122
std::optional<cloud_storage::configuration> cloud_cfg = std::nullopt,
123123
bool enable_legacy_upload_mode = true,
124124
bool iceberg_enabled = false,
125-
bool cloud_topics_enabled = false) {
125+
bool cloud_topics_enabled = false,
126+
bool cluster_linking_enabled = false) {
126127
return std::make_unique<redpanda_thread_fixture>(
127128
node_id,
128129
kafka_port,
@@ -141,7 +142,8 @@ class cluster_test_fixture {
141142
false,
142143
enable_legacy_upload_mode,
143144
iceberg_enabled,
144-
cloud_topics_enabled);
145+
cloud_topics_enabled,
146+
cluster_linking_enabled);
145147
}
146148

147149
void add_node(
@@ -160,7 +162,8 @@ class cluster_test_fixture {
160162
std::optional<cloud_storage::configuration> cloud_cfg = std::nullopt,
161163
bool enable_legacy_upload_mode = true,
162164
bool iceberg_enabled = false,
163-
bool cloud_topics_enabled = false) {
165+
bool cloud_topics_enabled = false,
166+
bool cluster_linking_enabled = false) {
164167
_instances.emplace(
165168
node_id,
166169
make_redpanda_fixture(
@@ -177,7 +180,8 @@ class cluster_test_fixture {
177180
cloud_cfg,
178181
enable_legacy_upload_mode,
179182
iceberg_enabled,
180-
cloud_topics_enabled));
183+
cloud_topics_enabled,
184+
cluster_linking_enabled));
181185
}
182186

183187
application* get_node_application(model::node_id id) {
@@ -212,7 +216,8 @@ class cluster_test_fixture {
212216
std::optional<cloud_storage::configuration> cloud_cfg = std::nullopt,
213217
bool legacy_upload_mode_enabled = true,
214218
bool iceberg_enabled = false,
215-
bool cloud_topics_enabled = false) {
219+
bool cloud_topics_enabled = false,
220+
bool cluster_linking_enabled = false) {
216221
std::vector<config::seed_server> seeds = {};
217222
if (!empty_seed_starts_cluster_val || node_id != 0) {
218223
seeds.push_back(
@@ -232,7 +237,8 @@ class cluster_test_fixture {
232237
cloud_cfg,
233238
legacy_upload_mode_enabled,
234239
iceberg_enabled,
235-
cloud_topics_enabled);
240+
cloud_topics_enabled,
241+
cluster_linking_enabled);
236242
return get_node_application(node_id);
237243
}
238244

src/v/cluster_link/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ redpanda_cc_library(
7373
"//src/v/base",
7474
"//src/v/cluster",
7575
"//src/v/cluster_link/model",
76+
"//src/v/cluster_link/replication:deps",
77+
"//src/v/cluster_link/replication:link_replication_mgr",
7678
"//src/v/container:chunked_vector",
7779
"//src/v/kafka/client:cluster",
7880
"//src/v/kafka/data/rpc",
@@ -138,6 +140,9 @@ redpanda_cc_library(
138140
"//src/v/base",
139141
"//src/v/cluster",
140142
"//src/v/cluster/utils:partition_change_notifier_api",
143+
"//src/v/cluster_link/replication:deps_impl",
144+
"//src/v/cluster_link/replication:mux_remote_consumer",
145+
"//src/v/kafka/client/direct_consumer",
141146
"//src/v/model",
142147
"//src/v/raft",
143148
"@seastar",

src/v/cluster_link/link.cc

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
#include "cluster_link/logger.h"
1515
#include "cluster_link/manager.h"
16-
#include "model/namespace.h"
1716
#include "ssx/future-util.h"
1817

1918
#include <seastar/coroutine/as_future.hh>
@@ -57,31 +56,40 @@ link::link(
5756
manager* manager,
5857
ss::lowres_clock::duration task_reconciler_interval,
5958
model::metadata config,
60-
std::unique_ptr<kafka::client::cluster> cluster_connection)
59+
std::unique_ptr<kafka::client::cluster> cluster_connection,
60+
std::unique_ptr<replication::data_source_factory> data_source_factory,
61+
std::unique_ptr<replication::data_sink_factory> data_sink_factory)
6162
: _self(self)
6263
, _link_id(link_id)
6364
, _manager(manager)
6465
, _config(std::move(config))
6566
, _cluster_connection(std::move(cluster_connection))
67+
, _replication_mgr(
68+
// todo: fix me
69+
ss::default_scheduling_group(),
70+
std::move(data_source_factory),
71+
std::move(data_sink_factory))
6672
, _task_reconciler_interval(task_reconciler_interval) {}
6773

6874
ss::future<> link::start() {
6975
vlog(
7076
cllog.info, "Starting cluster link {} ({})", _config.name, _config.uuid);
7177
// Allow exception to propagate to the caller
7278
co_await _cluster_connection->start();
79+
co_await _replication_mgr.start();
7380
co_await run_task_reconciler();
7481
_task_reconciler.set_callback([this] {
7582
ssx::spawn_with_gate(_gate, [this] { return run_task_reconciler(); });
7683
});
7784
_task_reconciler.arm_periodic(_task_reconciler_interval);
7885
}
7986

80-
ss::future<> link::stop() {
87+
ss::future<> link::stop() noexcept {
8188
vlog(
8289
cllog.info, "Stopping cluster link {} ({})", _config.name, _config.uuid);
8390
_task_reconciler.cancel();
8491
_as.request_abort();
92+
co_await _replication_mgr.stop();
8593
co_await _gate.close();
8694

8795
for (auto& [_, t] : _tasks) {
@@ -90,7 +98,16 @@ ss::future<> link::stop() {
9098
"Stopping task {} for cluster link {}",
9199
t->name(),
92100
_config.name);
93-
auto res = co_await stop_task(t.get());
101+
auto res_f = co_await ss::coroutine::as_future(stop_task(t.get()));
102+
if (res_f.failed()) {
103+
vlog(
104+
cllog.warn,
105+
"Failed to stop task {}: {}",
106+
t->name(),
107+
res_f.get_exception());
108+
continue;
109+
}
110+
auto res = res_f.get();
94111
if (!res) {
95112
if (res.assume_error().code() == errc::task_not_running) {
96113
// that's ok, keep going
@@ -148,15 +165,34 @@ void link::update_config(model::metadata config) {
148165
}
149166
}
150167

151-
ss::future<>
152-
link::handle_on_leadership_change(::model::ntp ntp, ntp_leader is_ntp_leader) {
168+
ss::future<> link::handle_on_leadership_change(
169+
::model::ntp ntp,
170+
ntp_leader is_ntp_leader,
171+
std::optional<::model::term_id> term) {
153172
vlog(
154173
cllog.trace,
155-
"Cluster link {} handling leadership change for {}: {}",
174+
"Cluster link {} handling leadership change for {}: {}, term: {}",
156175
_config.name,
157176
ntp,
158-
is_ntp_leader);
177+
is_ntp_leader,
178+
term);
159179

180+
const auto& mirror_topics = _config.state.mirror_topics;
181+
if (mirror_topics.contains(ntp.tp.topic)) {
182+
vlog(
183+
cllog.debug,
184+
"[{}] Leadership change event for partition {}, is_leader: {}",
185+
_link_id,
186+
ntp,
187+
is_ntp_leader);
188+
if (is_ntp_leader) {
189+
vassert(
190+
term, "Term must be set when leadership is assumed: {}", ntp);
191+
_replication_mgr.start_replicator(ntp, *term);
192+
} else {
193+
_replication_mgr.stop_replicator(ntp, term);
194+
}
195+
}
160196
// todo: add debouncing here so that we do not trigger multiple
161197
// reconciliation loops at once.
162198
co_await run_task_reconciler();

src/v/cluster_link/link.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#pragma once
1313

1414
#include "cluster_link/model/types.h"
15+
#include "cluster_link/replication/deps.h"
16+
#include "cluster_link/replication/link_replication_mgr.h"
1517
#include "cluster_link/task.h"
1618
#include "cluster_link/types.h"
1719
#include "kafka/client/cluster.h"
@@ -31,22 +33,26 @@ class link {
3133
manager* manager,
3234
ss::lowres_clock::duration task_reconciler_interval,
3335
model::metadata config,
34-
std::unique_ptr<kafka::client::cluster> cluster_connection);
36+
std::unique_ptr<kafka::client::cluster> cluster_connection,
37+
std::unique_ptr<replication::data_source_factory>,
38+
std::unique_ptr<replication::data_sink_factory>);
3539
link(const link&) = delete;
3640
link(link&&) = delete;
3741
link& operator=(const link&) = delete;
3842
link& operator=(link&&) = delete;
3943
virtual ~link() = default;
4044

4145
virtual ss::future<> start();
42-
virtual ss::future<> stop();
46+
virtual ss::future<> stop() noexcept;
4347

4448
ss::future<result<void>> register_task(task_factory*);
4549

4650
void update_config(model::metadata);
4751

48-
ss::future<>
49-
handle_on_leadership_change(::model::ntp ntp, ntp_leader is_ntp_leader);
52+
ss::future<> handle_on_leadership_change(
53+
::model::ntp ntp,
54+
ntp_leader is_ntp_leader,
55+
std::optional<::model::term_id>);
5056

5157
const model::metadata& config() const;
5258

@@ -111,6 +117,7 @@ class link {
111117
chunked_hash_map<ss::sstring, std::unique_ptr<task>> _tasks;
112118
model::metadata _config;
113119
std::unique_ptr<kafka::client::cluster> _cluster_connection;
120+
replication::link_replication_manager _replication_mgr;
114121

115122
notification_list<task_state_change_cb, task_state_notification_id>
116123
_task_state_change_notifications;

src/v/cluster_link/manager.cc

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -103,23 +103,6 @@ ss::future<> manager::start() {
103103
co_await handle_on_link_change(id);
104104
}
105105

106-
auto controller_node_leader = _partition_leader_cache->get_leader_node(
107-
::model::controller_ntp);
108-
if (
109-
controller_node_leader.has_value()
110-
&& controller_node_leader.value() == _self) {
111-
auto controller_shard_leader = _partition_manager->shard_owner(
112-
::model::controller_ntp);
113-
if (
114-
controller_shard_leader.has_value()
115-
&& controller_shard_leader.value() == ss::this_shard_id()) {
116-
vlog(
117-
cllog.info, "Cluster link manager started on controller shard");
118-
handle_partition_state_change(
119-
::model::controller_ntp, ntp_leader::yes);
120-
}
121-
}
122-
123106
_link_task_reconciler_timer.set_callback([this] {
124107
ssx::spawn_with_gate(_g, [this] { return link_task_reconciler(); });
125108
});
@@ -223,10 +206,12 @@ void manager::on_link_change(model::id_t id) {
223206
}
224207

225208
void manager::handle_partition_state_change(
226-
::model::ntp ntp, ntp_leader is_ntp_leader) {
209+
::model::ntp ntp,
210+
ntp_leader is_ntp_leader,
211+
std::optional<::model::term_id> term) {
227212
vlog(cllog.trace, "NTP={} leadership changed to {}", ntp, is_ntp_leader);
228-
_queue.submit([this, ntp{std::move(ntp)}, is_ntp_leader]() mutable {
229-
return handle_on_leadership_change(std::move(ntp), is_ntp_leader);
213+
_queue.submit([this, ntp{std::move(ntp)}, is_ntp_leader, term]() mutable {
214+
return handle_on_leadership_change(std::move(ntp), is_ntp_leader, term);
230215
});
231216
}
232217

@@ -243,20 +228,17 @@ ss::future<> manager::handle_on_link_change(model::id_t id) {
243228
try {
244229
vlog(cllog.debug, "Stopping cluster link with id={}", id);
245230
co_await it->second->stop();
246-
_links.erase(it);
247231
} catch (const std::exception& e) {
232+
// generally not possible since stop() is noexcept
233+
// but is not enforced for coroutines by the compiler.
248234
vlog(
249235
cllog.warn,
250-
"Failed to stop link {}: \"{}\". Re-attempting link "
251-
"stop "
252-
"within {} seconds",
236+
"Failed to stop link {}: \"{}, going ahead and removing "
237+
"it\".",
253238
id,
254-
e,
255-
retry_delay.count());
256-
_queue.submit_delayed(retry_delay, [this, id] {
257-
return handle_on_link_change(id);
258-
});
239+
e);
259240
}
241+
_links.erase(it);
260242
} else {
261243
vlog(cllog.trace, "No link found for id={}", id);
262244
}
@@ -312,7 +294,30 @@ ss::future<> manager::handle_on_link_change(model::id_t id) {
312294
e);
313295
}
314296
}
315-
co_await new_link->start();
297+
298+
std::exception_ptr start_eptr = nullptr;
299+
try {
300+
co_await new_link->start();
301+
} catch (...) {
302+
start_eptr = std::current_exception();
303+
}
304+
if (start_eptr) {
305+
vlog(
306+
cllog.warn,
307+
"Failed to start link {}: \"{}\"",
308+
id,
309+
start_eptr);
310+
try {
311+
co_await new_link->stop();
312+
} catch (...) {
313+
vlog(
314+
cllog.warn,
315+
"Failed to stop link {}: \"{}\", ignoring..",
316+
id,
317+
std::current_exception());
318+
}
319+
std::rethrow_exception(start_eptr);
320+
}
316321
_links.emplace(id, std::move(new_link));
317322
_link_created_cv.broadcast();
318323
} catch (const ss::semaphore_aborted&) {
@@ -394,7 +399,9 @@ ss::future<> manager::link_task_reconciler() {
394399
}
395400

396401
ss::future<> manager::handle_on_leadership_change(
397-
::model::ntp ntp, ntp_leader is_ntp_leader) {
402+
::model::ntp ntp,
403+
ntp_leader is_ntp_leader,
404+
std::optional<::model::term_id> term) {
398405
vlog(
399406
cllog.trace,
400407
"Handling leadership change for NTP={}, is_ntp_leader={}",
@@ -415,9 +422,11 @@ ss::future<> manager::handle_on_leadership_change(
415422
}
416423
}
417424

418-
co_await ss::parallel_for_each(_links, [ntp, is_ntp_leader](auto& pair) {
419-
return pair.second->handle_on_leadership_change(ntp, is_ntp_leader);
420-
});
425+
co_await ss::parallel_for_each(
426+
_links, [ntp, is_ntp_leader, term](auto& pair) {
427+
return pair.second->handle_on_leadership_change(
428+
ntp, is_ntp_leader, term);
429+
});
421430
}
422431

423432
ss::future<::cluster::cluster_link::errc> manager::add_mirror_topic(

src/v/cluster_link/manager.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,18 @@ class manager {
7272
/// Used to notify that a cluster link has been updated
7373
void on_link_change(model::id_t id);
7474
/// Used to notify manager in a change of NTP leadership
75-
void
76-
handle_partition_state_change(::model::ntp ntp, ntp_leader is_ntp_leader);
75+
void handle_partition_state_change(
76+
::model::ntp ntp,
77+
ntp_leader is_ntp_leader,
78+
std::optional<::model::term_id>);
7779
/// Handles creation and start of a link
7880
ss::future<> handle_on_link_change(model::id_t id);
7981
/// Handles leadership changes for a given NTP
80-
ss::future<> handle_on_leadership_change(::model::ntp, ntp_leader);
82+
/// term will be set if partition still exists on the shard
83+
/// Will definitely be set if is_ntp_leader == true because assuming
84+
// leadership implies the partition is still present
85+
ss::future<> handle_on_leadership_change(
86+
::model::ntp, ntp_leader, std::optional<::model::term_id>);
8187
/// Used to add a mirror topic to a cluster link
8288
ss::future<::cluster::cluster_link::errc>
8389
add_mirror_topic(model::id_t link_id, model::add_mirror_topic_cmd cmd);

0 commit comments

Comments
 (0)