Skip to content

Commit a2559f6

Browse files
committed
cl/service: update consumer configuration when link configuration change
When link configuration changes we must update configuration of consumer used to fetch data from the source cluster. This commits register manager notification that listens for the link configuration changes and updates the mux consumer configuration. Signed-off-by: Michał Maślanka <[email protected]>
1 parent 44a88c7 commit a2559f6

File tree

1 file changed

+26
-3
lines changed

1 file changed

+26
-3
lines changed

src/v/cluster_link/service.cc

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,19 +217,40 @@ class remote_partition_source : public replication::data_source {
217217
class remote_data_source_factory : public replication::data_source_factory {
218218
public:
219219
explicit remote_data_source_factory(
220+
model::id_t link_id,
221+
manager* manager,
220222
std::unique_ptr<replication::mux_remote_consumer> consumer)
221-
: _consumer(std::move(consumer)) {}
223+
: _link_id(link_id)
224+
, _manager(manager)
225+
, _consumer(std::move(consumer)) {}
222226

223-
ss::future<> start() final { return _consumer->start(); }
227+
ss::future<> start() final {
228+
_notification_id = _manager->register_link_config_changes_callback(
229+
[this](model::id_t link_id, const model::metadata& md) {
230+
// Ignore updates for other links
231+
if (link_id != _link_id) {
232+
return;
233+
}
234+
_consumer->update_configuration(
235+
make_remote_consumer_configuration(md.connection));
236+
});
237+
return _consumer->start();
238+
}
224239

225-
ss::future<> stop() noexcept final { return _consumer->stop(); }
240+
ss::future<> stop() noexcept final {
241+
_manager->unregister_link_config_changes_callback(_notification_id);
242+
return _consumer->stop();
243+
}
226244

227245
std::unique_ptr<replication::data_source>
228246
make_source(const ::model::ntp& ntp) final {
229247
return make_default_data_source(ntp.tp, *_consumer);
230248
}
231249

232250
private:
251+
model::id_t _link_id;
252+
manager* _manager;
253+
manager::notification_id _notification_id;
233254
std::unique_ptr<replication::mux_remote_consumer> _consumer;
234255
};
235256

@@ -406,6 +427,8 @@ class default_link_factory : public link_factory {
406427
std::move(config),
407428
std::move(cluster_connection),
408429
std::make_unique<remote_data_source_factory>(
430+
link_id,
431+
manager,
409432
std::make_unique<replication::mux_remote_consumer>(
410433
*cluster_connection,
411434
make_remote_consumer_configuration(config.connection))),

0 commit comments

Comments
 (0)