diff --git a/proto/redpanda/core/admin/v2/shadow_link.proto b/proto/redpanda/core/admin/v2/shadow_link.proto index adb9dc0f7d209..151fa976fb381 100644 --- a/proto/redpanda/core/admin/v2/shadow_link.proto +++ b/proto/redpanda/core/admin/v2/shadow_link.proto @@ -199,14 +199,17 @@ message ShadowLinkClientOptions { // If 0 is provided, defaults to 100ms int32 retry_backoff_ms = 8; // Fetch request timeout - // If 0 is provided, defaults to 100ms + // If 0 is provided, defaults to 500ms int32 fetch_wait_max_ms = 9; // Fetch min bytes - // If 0 is provided, defaults to 1 byte + // If 0 is provided, defaults to 5 MiB int32 fetch_min_bytes = 10; // Fetch max bytes - // If 0 is provided, defaults to 1MiB + // If 0 is provided, defaults to 20 MiB int32 fetch_max_bytes = 11; + // Fetch partition max bytes + // If 0 is provided, defaults to 1 MiB + int32 fetch_partition_max_bytes = 12; } // Options for syncing topic metadata message TopicMetadataSyncOptions { diff --git a/src/v/cluster_link/BUILD b/src/v/cluster_link/BUILD index a992b2b760ac0..139dec825a90f 100644 --- a/src/v/cluster_link/BUILD +++ b/src/v/cluster_link/BUILD @@ -193,10 +193,11 @@ redpanda_cc_library( "//src/v/base", "//src/v/cluster", "//src/v/cluster/utils:partition_change_notifier_api", - "//src/v/cluster_link/replication:deps_impl", + "//src/v/cluster_link/replication:deps", "//src/v/cluster_link/replication:mux_remote_consumer", "//src/v/kafka/client/direct_consumer", "//src/v/kafka/server", + "//src/v/kafka/server:write_at_offset_stm", "//src/v/model", "//src/v/raft", "@seastar", diff --git a/src/v/cluster_link/manager.cc b/src/v/cluster_link/manager.cc index 1c0a7e77ed5a5..9c6aefb63043d 100644 --- a/src/v/cluster_link/manager.cc +++ b/src/v/cluster_link/manager.cc @@ -357,6 +357,7 @@ ss::future<> manager::handle_on_link_change(model::id_t id) { id, link_metadata); it->second->update_config(link_metadata.copy()); + _cfg_change_notifications.notify(id, link_metadata); } else { // Create a new link vlog( @@ -620,6 +621,15 @@ ss::future<> manager::stop_topic_reconciler() { } } +manager::notification_id manager::register_link_config_changes_callback( + link_cfg_change_notification_cb cb) { + return _cfg_change_notifications.register_cb(std::move(cb)); +} + +void manager::unregister_link_config_changes_callback(notification_id id) { + _cfg_change_notifications.unregister_cb(id); +} + consumer_groups_router& manager::get_group_router() noexcept { return *_group_router; } diff --git a/src/v/cluster_link/manager.h b/src/v/cluster_link/manager.h index 60bfe08cc96c4..0013ce3d97631 100644 --- a/src/v/cluster_link/manager.h +++ b/src/v/cluster_link/manager.h @@ -35,6 +35,9 @@ namespace cluster_link { */ class manager { public: + using notification_id = named_type; + using link_cfg_change_notification_cb + = ss::noncopyable_function; manager( ::model::node_id self, std::unique_ptr @@ -157,6 +160,10 @@ class manager { return _scheduling_group; } + notification_id + register_link_config_changes_callback(link_cfg_change_notification_cb cb); + void unregister_link_config_changes_callback(notification_id cb); + private: /// Called periodically to reconcile registered tasks on created links ss::future<> link_task_reconciler(); @@ -182,6 +189,8 @@ class manager { chunked_vector> _task_factories; absl::flat_hash_map> _links; + notification_list + _cfg_change_notifications; ss::lowres_clock::duration _task_reconciler_interval; mutex _link_task_reconciler_mutex{ diff --git a/src/v/cluster_link/model/types.h b/src/v/cluster_link/model/types.h index 5b31bfbbdbb23..7665e495de023 100644 --- a/src/v/cluster_link/model/types.h +++ b/src/v/cluster_link/model/types.h @@ -133,7 +133,7 @@ std::ostream& operator<<(std::ostream& os, const tls_file_or_value& t); */ struct connection_config : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { /// List of addresses to bootstrap the connection std::vector bootstrap_servers; /// Support authn variants. Currently only SCRAM but update this to add @@ -163,16 +163,20 @@ struct connection_config static constexpr auto retry_backoff_ms_default = 100; // Maximum fetch wait time std::optional fetch_wait_max_ms; - // Default value for fetch_wait_max_ms (100ms) - static constexpr auto fetch_wait_max_ms_default = 100; + // Default value for fetch_wait_max_ms (500ms) + static constexpr auto fetch_wait_max_ms_default = 500; // Minimum number of bytes to fetch std::optional fetch_min_bytes; - // Default minimum number of bytes to fetch (1B) - static constexpr auto fetch_min_bytes_default = 1; + // Default minimum number of bytes to fetch (5MiB) + static constexpr auto fetch_min_bytes_default = 5_MiB; // Maximum number of bytes to fetch std::optional fetch_max_bytes; + // Maximum number of bytes to fetch per partition + std::optional fetch_partition_max_bytes; + // Default maximum number of bytes to fetch per partition + static constexpr auto default_fetch_partition_max_bytes = 1_MiB; // Default maximum number of bytes to fetch (1MiB) - static constexpr auto fetch_max_bytes_default = 1 * 1024 * 1024; + static constexpr auto fetch_max_bytes_default = 20 * 1024 * 1024; // Returns the metadata_max_age_ms value int32_t get_metadata_max_age_ms() const { @@ -204,6 +208,11 @@ struct connection_config return fetch_max_bytes.value_or(fetch_max_bytes_default); } + int32_t get_fetch_partition_max_bytes() const { + return fetch_partition_max_bytes.value_or( + default_fetch_partition_max_bytes); + } + friend bool operator==(const connection_config&, const connection_config&) = default; @@ -220,7 +229,8 @@ struct connection_config retry_backoff_ms, fetch_wait_max_ms, fetch_min_bytes, - fetch_max_bytes); + fetch_max_bytes, + fetch_partition_max_bytes); } friend std::ostream& diff --git a/src/v/cluster_link/replication/BUILD b/src/v/cluster_link/replication/BUILD index ac6c1927ae35e..1b1353807bd28 100644 --- a/src/v/cluster_link/replication/BUILD +++ b/src/v/cluster_link/replication/BUILD @@ -22,7 +22,10 @@ redpanda_cc_library( hdrs = [ "partition_replicator.h", ], - visibility = ["__subpackages__"], + visibility = [ + "__subpackages__", + "//src/v/cluster_link:__subpackages__", + ], deps = [ ":deps", "//src/v/cluster_link:logger", @@ -88,6 +91,7 @@ redpanda_cc_library( visibility = ["//src/v/cluster_link:__subpackages__"], deps = [ ":partition_data_queue", + "//src/v/base", "//src/v/cluster_link:logger", "//src/v/container:chunked_hash_map", "//src/v/kafka/client/direct_consumer", @@ -95,23 +99,3 @@ redpanda_cc_library( "@seastar", ], ) - -redpanda_cc_library( - name = "deps_impl", - srcs = [ - "deps_impl.cc", - ], - hdrs = [ - "deps_impl.h", - ], - visibility = ["//src/v/cluster_link:__subpackages__"], - deps = [ - ":deps", - ":mux_remote_consumer", - "//src/v/cluster", - "//src/v/cluster_link:logger", - "//src/v/kafka/server:write_at_offset_stm", - "//src/v/ssx:future_util", - "@seastar", - ], -) diff --git a/src/v/cluster_link/replication/deps_impl.cc b/src/v/cluster_link/replication/deps_impl.cc deleted file mode 100644 index dfbe8f6183059..0000000000000 --- a/src/v/cluster_link/replication/deps_impl.cc +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cluster_link/replication/deps_impl.h" - -#include "cluster/partition.h" -#include "cluster/partition_manager.h" -#include "cluster_link/logger.h" -#include "cluster_link/replication/mux_remote_consumer.h" -#include "kafka/server/write_at_offset_stm.h" -#include "ssx/future-util.h" - -static constexpr auto sync_timeout = 10s; - -namespace cluster_link::replication { - -remote_data_source_factory::remote_data_source_factory( - std::unique_ptr consumer) - : _consumer(std::move(consumer)) {} - -// to keep the unique_ptr fwd declaration happy -remote_data_source_factory::~remote_data_source_factory() = default; - -ss::future<> remote_data_source_factory::start() { return _consumer->start(); } - -ss::future<> remote_data_source_factory::stop() noexcept { - return _consumer->stop(); -} - -std::unique_ptr -remote_data_source_factory::make_source(const ::model::ntp& ntp) { - return std::make_unique(ntp.tp, *_consumer); -} - -ss::future<> remote_partition_source::start(kafka::offset offset) { - vlog(cllog.trace, "[{}] Starting remote partition source", _tp); - auto result = _consumer.add(_tp, offset); - if (!result.has_value()) [[unlikely]] { - // this is usually indicative of a bug in the manager where - // a previous source is not deregistered, bubble it up. - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to add remote partition source: {}", - _tp, - err); - return ss::make_exception_future<>(err); - } - return ss::now(); -} - -ss::future<> remote_partition_source::stop() noexcept { - vlog(cllog.trace, "[{}] Stopping remote partition source", _tp); - auto f = _gate.close(); - co_await _consumer.remove(_tp); - co_await std::move(f); -} - -ss::future<> remote_partition_source::reset(kafka::offset offset) { - _gate.check(); - auto result = _consumer.reset(_tp, offset); - if (!result.has_value()) [[unlikely]] { - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to reset remote partition source: {}", - _tp, - err); - return ss::make_exception_future<>(err); - } - return ss::now(); -} - -ss::future -remote_partition_source::fetch_next(ss::abort_source& as) { - auto holder = _gate.hold(); - auto result = co_await _consumer.fetch(_tp, as); - if (!result.has_value()) [[unlikely]] { - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to fetch from remote partition source: {}", - _tp, - result.error()); - throw std::runtime_error( - fmt::format( - "[{}] Failed to fetch from remote partition source: {}", _tp, err)); - } - auto [batches, units] = std::move(*result); - co_return data_source::data{ - .batches = std::move(batches), .units = std::move(units)}; -} - -std::unique_ptr -local_partition_data_sink_factory::make_sink(const ::model::ntp& ntp) { - auto partition = _partition_manager.local().get(ntp); - if (!partition) { - throw std::runtime_error( - fmt::format("Partition not found: {} on this shard", ntp)); - } - return std::make_unique(std::move(partition)); -} - -local_partition_sink::local_partition_sink( - ss::lw_shared_ptr partition) - : _partition(std::move(partition)) - , _stm(_partition->raft()->stm_manager()->get()) { - vassert( - _stm, - "write_at_offset_stm not attached to partition {}", - _partition->ntp()); -} -ss::future<> local_partition_sink::start() { - auto holder = _gate.hold(); - auto sync_offset = co_await _stm->get_expected_last_offset(sync_timeout); - if (sync_offset.has_error()) { - throw std::runtime_error( - fmt::format( - "Failed to sync write_at_offset_stm for partition {}: {}", - _partition->ntp(), - sync_offset.error().message())); - } - vlog( - cllog.trace, - "[{}] Starting local partition sink at offset {}", - _partition->ntp(), - sync_offset.value()); - _last_replicated_offset = sync_offset.value(); -} - -ss::future<> local_partition_sink::stop() noexcept { - vlog(cllog.trace, "[{}] Stopping local partition sink", _partition->ntp()); - co_await _gate.close(); -} - -kafka::offset local_partition_sink::last_replicated_offset() const { - vassert(_last_replicated_offset, "Sink has not been started"); - return _last_replicated_offset.value(); -} - -raft::replicate_stages local_partition_sink::replicate( - chunked_vector<::model::record_batch> batches, - ::model::timeout_clock::duration timeout, - ss::abort_source& as) { - _gate.check(); - vassert(_last_replicated_offset, "Sink has not been started"); - vassert( - !batches.empty(), - "Cannot replicate empty batch vector {}", - _partition->ntp()); - chunked_vector expected_offsets; - expected_offsets.reserve(batches.size()); - for (const auto& batch : batches) { - expected_offsets.push_back(::model::offset_cast(batch.base_offset())); - } - auto new_last_replicated_begin = ::model::offset_cast( - batches.front().base_offset()); - auto new_last_replicated_end = ::model::offset_cast( - batches.back().last_offset()); - vassert( - new_last_replicated_begin > _last_replicated_offset - && new_last_replicated_end > _last_replicated_offset, - "[{}] Replicating offsets must be monotonically increasing last " - "replicated: {}, attempting to replicate: [{}, {}]", - _partition->ntp(), - _last_replicated_offset, - new_last_replicated_begin, - new_last_replicated_end); - vlog( - cllog.trace, - "[{}] Replicating batches in range [{} - {}], last_replicated: {}, " - "new_last_replicated: {}", - _partition->ntp(), - batches.front().header(), - batches.back().header(), - _last_replicated_offset, - new_last_replicated_end); - auto stages = _stm->replicate( - std::move(batches), - std::move(expected_offsets), - _last_replicated_offset, - timeout, - as); - _last_replicated_offset = new_last_replicated_end; - return stages; -} - -void local_partition_sink::notify_replicator_failure(::model::term_id term) { - if (_gate.is_closed()) { - return; - } - // If the replicator failed to start _and_ the partition is still the - // leader in the same term we are effectively stuck without a replicator. - // Here we step down to ensure a new leader comes up and a replicator start - // is triggered again on the new leader. - if (_partition->term() == term) { - ssx::spawn_with_gate(_gate, [this, term] { - return _partition->raft()->step_down( - fmt::format("Unable to start replicator in term: {}", term)); - }); - } -} -} // namespace cluster_link::replication diff --git a/src/v/cluster_link/replication/deps_impl.h b/src/v/cluster_link/replication/deps_impl.h deleted file mode 100644 index 95eab0cc0f9b2..0000000000000 --- a/src/v/cluster_link/replication/deps_impl.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cluster/fwd.h" -#include "cluster_link/replication/deps.h" - -#include -#include - -namespace kafka { -class write_at_offset_stm; -}; - -namespace cluster_link::replication { - -class mux_remote_consumer; -/* - * Source backed by partition data on a remote cluster. - */ -class remote_partition_source : public data_source { -public: - explicit remote_partition_source( - ::model::topic_partition tp, mux_remote_consumer& consumer) - : _tp(std::move(tp)) - , _consumer(consumer) {} - ss::future<> start(kafka::offset) override; - ss::future<> stop() noexcept override; - ss::future<> reset(kafka::offset) override; - ss::future fetch_next(ss::abort_source&) override; - -private: - ::model::topic_partition _tp; - mux_remote_consumer& _consumer; - ss::gate _gate; -}; - -class remote_data_source_factory : public data_source_factory { -public: - explicit remote_data_source_factory(std::unique_ptr); - ~remote_data_source_factory() override; - ss::future<> start() override; - ss::future<> stop() noexcept override; - std::unique_ptr make_source(const ::model::ntp&) override; - -private: - std::unique_ptr _consumer; -}; - -/* - * Sink for writing partition data to the partition leader on the local shard. - */ -class local_partition_sink : public data_sink { -public: - explicit local_partition_sink(ss::lw_shared_ptr); - ss::future<> start() override; - ss::future<> stop() noexcept override; - kafka::offset last_replicated_offset() const override; - raft::replicate_stages replicate( - chunked_vector<::model::record_batch> batches, - ::model::timeout_clock::duration timeout, - ss::abort_source& as) override; - void notify_replicator_failure(::model::term_id) override; - -private: - ss::gate _gate; - ss::lw_shared_ptr _partition; - ss::shared_ptr _stm; - // set in start(); - std::optional _last_replicated_offset; -}; - -class local_partition_data_sink_factory : public data_sink_factory { -public: - explicit local_partition_data_sink_factory( - ss::sharded& pm) - : _partition_manager(pm) {} - std::unique_ptr make_sink(const ::model::ntp&) override; - -private: - ss::sharded& _partition_manager; -}; - -} // namespace cluster_link::replication diff --git a/src/v/cluster_link/replication/mux_remote_consumer.cc b/src/v/cluster_link/replication/mux_remote_consumer.cc index aaa58cf40997c..206c5db0a1771 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.cc +++ b/src/v/cluster_link/replication/mux_remote_consumer.cc @@ -15,6 +15,15 @@ namespace cluster_link::replication { +mux_remote_consumer::mux_remote_consumer( + kafka::client::cluster& cluster, + mux_remote_consumer::configuration consumer_configuration) + : _consumer( + std::make_unique( + cluster, consumer_configuration.direct_consumer_configuration)) + , _partition_max_buffered(consumer_configuration.partition_max_buffered) + , _fetch_max_wait(consumer_configuration.fetch_max_wait) {} + ss::future<> mux_remote_consumer::start() { co_await _consumer->start(); ssx::repeat_until_gate_closed(_gate, [this]() { @@ -231,6 +240,27 @@ ss::future<> mux_remote_consumer::fetch_loop() { co_await process_fetched_data(std::move(fetch_result.value())); } } + +void mux_remote_consumer::update_configuration(const configuration& cfg) { + vlog(cllog.info, "Updating mux consumer configuration: {}", cfg); + _consumer->update_configuration(cfg.direct_consumer_configuration); + _partition_max_buffered = cfg.partition_max_buffered; + _fetch_max_wait = cfg.fetch_max_wait; + for (auto& [tp, queue] : _partitions) { + queue->update_max_buffered(_partition_max_buffered); + } +} + +fmt::iterator +mux_remote_consumer::configuration::format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "{{ direct_consumer_configuration: {}, partition_max_buffered: {}, " + "fetch_max_wait: {}ms }}", + direct_consumer_configuration, + partition_max_buffered, + fetch_max_wait.count()); +} } // namespace cluster_link::replication auto fmt::formatter:: diff --git a/src/v/cluster_link/replication/mux_remote_consumer.h b/src/v/cluster_link/replication/mux_remote_consumer.h index fd804f1e2c1ae..ece7bbb2e8b69 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.h +++ b/src/v/cluster_link/replication/mux_remote_consumer.h @@ -10,6 +10,7 @@ #pragma once +#include "base/format_to.h" #include "cluster_link/replication/partition_data_queue.h" #include "container/chunked_hash_map.h" #include "kafka/client/direct_consumer/direct_consumer.h" @@ -37,19 +38,23 @@ namespace cluster_link::replication { */ class mux_remote_consumer { public: + struct configuration { + kafka::client::direct_consumer::configuration + direct_consumer_configuration; + size_t partition_max_buffered; + std::chrono::milliseconds fetch_max_wait; + + fmt::iterator format_to(fmt::iterator it) const; + }; + enum class errc : int8_t { partition_not_found = 1, partition_already_exists = 2, }; using result = std::expected; - explicit mux_remote_consumer( - std::unique_ptr consumer, - size_t partition_max_buffered, - std::chrono::milliseconds fetch_max_wait) - : _consumer(std::move(consumer)) - , _partition_max_buffered(partition_max_buffered) - , _fetch_max_wait(fetch_max_wait) {} + mux_remote_consumer( + kafka::client::cluster& cluster, configuration consumer_configuration); ss::future<> start(); ss::future<> stop() noexcept; @@ -73,8 +78,15 @@ class mux_remote_consumer { */ ss::future> fetch(const ::model::topic_partition&, ss::abort_source&); + /** + * Update the configuration of the consumer. + * + * The changes in the configuration will be applied for subsequent fetches. + */ + void update_configuration(const configuration& cfg); private: + friend class MuxConsumerFixture; bool can_ignore_partition_data(const ::model::topic_partition&); ss::future<> assign_pending_partitions(); ss::future<> fetch_loop(); diff --git a/src/v/cluster_link/replication/partition_data_queue.cc b/src/v/cluster_link/replication/partition_data_queue.cc index ffc8d2878aaea..cc1fc21e0f606 100644 --- a/src/v/cluster_link/replication/partition_data_queue.cc +++ b/src/v/cluster_link/replication/partition_data_queue.cc @@ -15,8 +15,26 @@ namespace cluster_link::replication { partition_data_queue::partition_data_queue(size_t max_buffered_bytes) - : _sem(max_buffered_bytes, "partition_data_queue") {} + : _max_buffered_bytes(max_buffered_bytes) + , _sem(max_buffered_bytes, "partition_data_queue") {} +void partition_data_queue::update_max_buffered(size_t new_value) { + // ignore update if gate is closed, the queue is stopping + if (_gate.is_closed()) { + return; + } + + if (new_value == _max_buffered_bytes) { + return; + } + if (new_value > _max_buffered_bytes) { + _sem.signal(new_value - _max_buffered_bytes); + + } else { + _sem.consume(_max_buffered_bytes - new_value); + } + _max_buffered_bytes = new_value; +} void partition_data_queue::reset(kafka::offset next) { _gate.check(); do_reset(next); diff --git a/src/v/cluster_link/replication/partition_data_queue.h b/src/v/cluster_link/replication/partition_data_queue.h index 724f2c25a1880..ef8ddf63d1696 100644 --- a/src/v/cluster_link/replication/partition_data_queue.h +++ b/src/v/cluster_link/replication/partition_data_queue.h @@ -56,11 +56,14 @@ class partition_data_queue { bool empty() const { return _batches.empty(); } bool full() const { return _sem.available_units() <= 0; } + void update_max_buffered(size_t max_buffered_bytes); + private: void do_reset(kafka::offset next); kafka::offset _next{}; void maybe_notify_waiter(); std::optional> _waiter; + size_t _max_buffered_bytes; ssx::semaphore _sem; ssx::semaphore_units _batch_units; chunked_vector<::model::record_batch> _batches; diff --git a/src/v/cluster_link/replication/tests/BUILD b/src/v/cluster_link/replication/tests/BUILD index 1a99d87ab98c1..f486310128454 100644 --- a/src/v/cluster_link/replication/tests/BUILD +++ b/src/v/cluster_link/replication/tests/BUILD @@ -88,24 +88,3 @@ redpanda_test_cc_library( "@seastar", ], ) - -redpanda_cc_gtest( - name = "partition_replicator_fixture_test", - timeout = "short", - srcs = [ - "partition_replicator_fixture_tests.cc", - ], - cpu = 1, - deps = [ - "//src/v/cluster_link/replication:deps_impl", - "//src/v/cluster_link/replication:mux_remote_consumer", - "//src/v/cluster_link/replication:partition_replicator", - "//src/v/kafka/client/direct_consumer/tests:direct_consumer_fixture", - "//src/v/model", - "//src/v/redpanda/tests:fixture", - "@fmt", - "@googletest//:gtest", - "@seastar", - "@seastar//:testing", - ], -) diff --git a/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc b/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc index 4e0887f97755d..a37641fea71a2 100644 --- a/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc +++ b/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc @@ -26,11 +26,17 @@ class MuxConsumerFixture : public BasicConsumerFixture { public: void SetUp() override { basic_consumer_fixture::SetUp(); - auto consumer = make_consumer(); - _raw_consumer = consumer.get(); + _mux_consumer = std::make_unique( - std::move(consumer), partition_max_buffered, fetch_max_wait); + *cluster, + mux_remote_consumer::configuration{ + .direct_consumer_configuration = direct_consumer:: + configuration{.with_sessions = fetch_sessions_enabled{GetParam() == kafka::client::tests::session_config::with_sessions}}, + .partition_max_buffered = partition_max_buffered, + .fetch_max_wait = fetch_max_wait, + }); _mux_consumer->start().get(); + _raw_consumer = _mux_consumer->_consumer.get(); } void TearDown() override { if (_mux_consumer) { diff --git a/src/v/cluster_link/service.cc b/src/v/cluster_link/service.cc index 52d7d9ac56e9a..fb0372a5ba72a 100644 --- a/src/v/cluster_link/service.cc +++ b/src/v/cluster_link/service.cc @@ -19,12 +19,13 @@ #include "cluster_link/logger.h" #include "cluster_link/manager.h" #include "cluster_link/model/types.h" -#include "cluster_link/replication/deps_impl.h" +#include "cluster_link/replication/deps.h" #include "cluster_link/replication/mux_remote_consumer.h" #include "cluster_link/security_migrator.h" #include "cluster_link/source_topic_syncer.h" #include "kafka/client/direct_consumer/direct_consumer.h" #include "kafka/server/group_router.h" +#include "kafka/server/write_at_offset_stm.h" #include @@ -35,8 +36,6 @@ using kafka::data::rpc::partition_leader_cache; using kafka::data::rpc::partition_manager; using kafka::data::rpc::topic_creator; using kafka::data::rpc::topic_metadata_cache; -using data_src_factory = replication::remote_data_source_factory; -using data_sink_factory = replication::local_partition_data_sink_factory; class link_registry_adapter : public link_registry { public: @@ -112,6 +111,300 @@ class link_registry_adapter : public link_registry { private: frontend* _plf; }; +namespace { +replication::mux_remote_consumer::configuration +make_remote_consumer_configuration(const model::connection_config& conn_cfg) { + const size_t max_buffered_bytes = 2 * conn_cfg.get_fetch_max_bytes(); + kafka::client::direct_consumer::configuration dc_configuration; + const auto max_wait_time = std::chrono::milliseconds( + conn_cfg.get_fetch_wait_max_ms()); + + dc_configuration.min_bytes = conn_cfg.get_fetch_min_bytes(); + dc_configuration.max_fetch_size = conn_cfg.get_fetch_max_bytes(); + dc_configuration.isolation_level = ::model::isolation_level::read_committed; + dc_configuration.max_buffered_bytes = max_buffered_bytes; + // We are not interested in limiting the number of buffered fetches as + // we already set bytes limit + dc_configuration.max_buffered_elements = std::numeric_limits::max(); + dc_configuration.with_sessions = kafka::client::fetch_sessions_enabled::yes; + + dc_configuration.max_wait_time = max_wait_time; + dc_configuration.partition_max_bytes + = conn_cfg.get_fetch_partition_max_bytes(); + + return replication::mux_remote_consumer::configuration{ + .direct_consumer_configuration = dc_configuration, + .partition_max_buffered = max_buffered_bytes, + .fetch_max_wait = max_wait_time, + }; +} + +} // namespace + +class remote_partition_source : public replication::data_source { +public: + explicit remote_partition_source( + ::model::topic_partition tp, replication::mux_remote_consumer& consumer) + : _tp(std::move(tp)) + , _consumer(consumer) {} + + ss::future<> start(kafka::offset offset) final { + vlog(cllog.trace, "[{}] Starting remote partition source", _tp); + auto result = _consumer.add(_tp, offset); + if (!result.has_value()) [[unlikely]] { + // this is usually indicative of a bug in the manager where + // a previous source is not deregistered, bubble it up. + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to add remote partition source: {}", + _tp, + err); + return ss::make_exception_future<>(err); + } + return ss::now(); + } + + ss::future<> stop() noexcept final { + vlog(cllog.trace, "[{}] Stopping remote partition source", _tp); + auto f = _gate.close(); + co_await _consumer.remove(_tp); + co_await std::move(f); + } + + ss::future<> reset(kafka::offset offset) final { + _gate.check(); + auto result = _consumer.reset(_tp, offset); + if (!result.has_value()) [[unlikely]] { + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to reset remote partition source: {}", + _tp, + err); + return ss::make_exception_future<>(err); + } + return ss::now(); + } + + ss::future + fetch_next(ss::abort_source& as) final { + auto holder = _gate.hold(); + auto result = co_await _consumer.fetch(_tp, as); + if (!result.has_value()) [[unlikely]] { + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to fetch from remote partition source: {}", + _tp, + result.error()); + throw std::runtime_error( + fmt::format( + "[{}] Failed to fetch from remote partition source: {}", + _tp, + err)); + } + auto [batches, units] = std::move(*result); + co_return data_source::data{ + .batches = std::move(batches), .units = std::move(units)}; + } + +private: + ::model::topic_partition _tp; + replication::mux_remote_consumer& _consumer; + ss::gate _gate; +}; + +class remote_data_source_factory : public replication::data_source_factory { +public: + explicit remote_data_source_factory( + model::id_t link_id, + manager* manager, + std::unique_ptr consumer) + : _link_id(link_id) + , _manager(manager) + , _consumer(std::move(consumer)) {} + + ss::future<> start() final { + _notification_id = _manager->register_link_config_changes_callback( + [this](model::id_t link_id, const model::metadata& md) { + // Ignore updates for other links + if (link_id != _link_id) { + return; + } + _consumer->update_configuration( + make_remote_consumer_configuration(md.connection)); + }); + return _consumer->start(); + } + + ss::future<> stop() noexcept final { + _manager->unregister_link_config_changes_callback(_notification_id); + return _consumer->stop(); + } + + std::unique_ptr + make_source(const ::model::ntp& ntp) final { + return make_default_data_source(ntp.tp, *_consumer); + } + +private: + model::id_t _link_id; + manager* _manager; + manager::notification_id _notification_id; + std::unique_ptr _consumer; +}; + +/* + * Sink for writing partition data to the partition leader on the local shard. + */ +class local_partition_sink : public replication::data_sink { +public: + static constexpr auto sync_timeout = 10s; + explicit local_partition_sink( + ss::lw_shared_ptr partition) + : _partition(std::move(partition)) + , _stm(_partition->raft() + ->stm_manager() + ->get()) { + vassert( + _stm, + "write_at_offset_stm not attached to partition {}", + _partition->ntp()); + } + ss::future<> start() final { + auto holder = _gate.hold(); + auto sync_offset = co_await _stm->get_expected_last_offset( + sync_timeout); + if (sync_offset.has_error()) { + throw std::runtime_error( + fmt::format( + "Failed to sync write_at_offset_stm for partition {}: {}", + _partition->ntp(), + sync_offset.error().message())); + } + vlog( + cllog.trace, + "[{}] Starting local partition sink at offset {}", + _partition->ntp(), + sync_offset.value()); + _last_replicated_offset = sync_offset.value(); + } + + ss::future<> stop() noexcept final { + vlog( + cllog.trace, "[{}] Stopping local partition sink", _partition->ntp()); + co_await _gate.close(); + } + + kafka::offset last_replicated_offset() const final { + vassert(_last_replicated_offset, "Sink has not been started"); + return _last_replicated_offset.value(); + } + + raft::replicate_stages replicate( + chunked_vector<::model::record_batch> batches, + ::model::timeout_clock::duration timeout, + ss::abort_source& as) final { + _gate.check(); + vassert(_last_replicated_offset, "Sink has not been started"); + vassert( + !batches.empty(), + "Cannot replicate empty batch vector {}", + _partition->ntp()); + chunked_vector expected_offsets; + expected_offsets.reserve(batches.size()); + for (const auto& batch : batches) { + expected_offsets.push_back( + ::model::offset_cast(batch.base_offset())); + } + auto new_last_replicated_begin = ::model::offset_cast( + batches.front().base_offset()); + auto new_last_replicated_end = ::model::offset_cast( + batches.back().last_offset()); + vassert( + new_last_replicated_begin > _last_replicated_offset + && new_last_replicated_end > _last_replicated_offset, + "[{}] Replicating offsets must be monotonically increasing last " + "replicated: {}, attempting to replicate: [{}, {}]", + _partition->ntp(), + _last_replicated_offset, + new_last_replicated_begin, + new_last_replicated_end); + vlog( + cllog.trace, + "[{}] Replicating batches in range [{} - {}], last_replicated: {}, " + "new_last_replicated: {}", + _partition->ntp(), + batches.front().header(), + batches.back().header(), + _last_replicated_offset, + new_last_replicated_end); + auto stages = _stm->replicate( + std::move(batches), + std::move(expected_offsets), + _last_replicated_offset, + timeout, + as); + _last_replicated_offset = new_last_replicated_end; + return stages; + } + + void notify_replicator_failure(::model::term_id term) final { + if (_gate.is_closed()) { + return; + } + // If the replicator failed to start _and_ the partition is still the + // leader in the same term we are effectively stuck without a + // replicator. Here we step down to ensure a new leader comes up and a + // replicator start is triggered again on the new leader. + if (_partition->term() == term) { + ssx::spawn_with_gate(_gate, [this, term] { + return _partition->raft()->step_down( + fmt::format("Unable to start replicator in term: {}", term)); + }); + } + } + +private: + ss::gate _gate; + ss::lw_shared_ptr _partition; + ss::shared_ptr _stm; + // set in start(); + std::optional _last_replicated_offset; +}; + +class local_partition_data_sink_factory + : public replication::data_sink_factory { +public: + explicit local_partition_data_sink_factory( + ss::sharded& pm) + : _partition_manager(pm) {} + + std::unique_ptr + make_sink(const ::model::ntp& ntp) final { + auto partition = _partition_manager.local().get(ntp); + if (!partition) { + throw std::runtime_error( + fmt::format("Partition not found: {} on this shard", ntp)); + } + return make_default_data_sink(std::move(partition)); + } + +private: + ss::sharded& _partition_manager; +}; + +std::unique_ptr make_default_data_source( + const ::model::topic_partition& tp, + replication::mux_remote_consumer& consumer) { + return std::make_unique(tp, consumer); +} + +std::unique_ptr +make_default_data_sink(ss::lw_shared_ptr partition) { + return std::make_unique(std::move(partition)); +} class default_link_factory : public link_factory { public: @@ -134,36 +427,17 @@ class default_link_factory : public link_factory { link_reconciler_period, std::move(config), std::move(cluster_connection), - std::make_unique( - make_remote_consumer(*cluster_connection, config.connection)), - std::make_unique(*_partition_manager)); + std::make_unique( + link_id, + manager, + std::make_unique( + *cluster_connection, + make_remote_consumer_configuration(config.connection))), + std::make_unique( + *_partition_manager)); } private: - std::unique_ptr make_remote_consumer( - kafka::client::cluster& cluster, - const model::connection_config& conn_cfg) { - // todo0: make more these configurable at connection level - // todo1: make these dynamic - kafka::client::direct_consumer::configuration cfg; - cfg.min_bytes = conn_cfg.get_fetch_min_bytes(); - cfg.max_fetch_size = conn_cfg.get_fetch_max_bytes(); - cfg.partition_max_bytes = 512_KiB; - cfg.max_wait_time = 200ms; - cfg.isolation_level = ::model::isolation_level::read_committed; - cfg.max_buffered_bytes = 5_MiB; - cfg.max_buffered_elements = std::numeric_limits::max(); - cfg.with_sessions = kafka::client::fetch_sessions_enabled::yes; - static constexpr size_t partition_max_buffered_bytes = 5_MiB; - static constexpr auto fetch_max_wait = 100ms; - auto direct_consumer = std::make_unique( - cluster, cfg); - - return std::make_unique( - std::move(direct_consumer), - partition_max_buffered_bytes, - fetch_max_wait); - } ss::sharded* _partition_manager; }; diff --git a/src/v/cluster_link/service.h b/src/v/cluster_link/service.h index 10880b97930b5..b81aa4eabf3bc 100644 --- a/src/v/cluster_link/service.h +++ b/src/v/cluster_link/service.h @@ -26,6 +26,11 @@ #include namespace cluster_link { +namespace replication { +class data_source; +class data_sink; +class mux_remote_consumer; +} // namespace replication /** * @brief API access for cluster link service */ @@ -116,4 +121,11 @@ class service { std::vector>> _notification_cleanups; }; + +std::unique_ptr make_default_data_source( + const ::model::topic_partition& tp, + replication::mux_remote_consumer& consumer); + +std::unique_ptr +make_default_data_sink(ss::lw_shared_ptr partition); } // namespace cluster_link diff --git a/src/v/cluster_link/tests/BUILD b/src/v/cluster_link/tests/BUILD index 36f827edcadbf..e04c70a9411d0 100644 --- a/src/v/cluster_link/tests/BUILD +++ b/src/v/cluster_link/tests/BUILD @@ -163,3 +163,25 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "partition_replicator_fixture_test", + timeout = "short", + srcs = [ + "partition_replicator_fixture_tests.cc", + ], + cpu = 1, + deps = [ + "//src/v/cluster_link", + "//src/v/cluster_link/replication:deps", + "//src/v/cluster_link/replication:mux_remote_consumer", + "//src/v/cluster_link/replication:partition_replicator", + "//src/v/kafka/client/direct_consumer/tests:direct_consumer_fixture", + "//src/v/model", + "//src/v/redpanda/tests:fixture", + "@fmt", + "@googletest//:gtest", + "@seastar", + "@seastar//:testing", + ], +) diff --git a/src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc similarity index 80% rename from src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc rename to src/v/cluster_link/tests/partition_replicator_fixture_tests.cc index 084cc5bcf25b8..c754e6410a778 100644 --- a/src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc +++ b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc @@ -8,15 +8,16 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ -#include "cluster_link/replication/deps_impl.h" +#include "cluster_link/replication/deps.h" #include "cluster_link/replication/mux_remote_consumer.h" #include "cluster_link/replication/partition_replicator.h" +#include "cluster_link/service.h" #include "kafka/client/direct_consumer/tests/direct_consumer_fixture.h" namespace { ss::logger logger{"replicator-fixture-test"}; } -using namespace cluster_link::replication; + using BasicConsumerFixture = kafka::client::tests::basic_consumer_fixture; static constexpr std::chrono::milliseconds fetch_max_wait{10}; @@ -26,9 +27,19 @@ class ReplicatorFixture : public BasicConsumerFixture { public: void SetUp() override { basic_consumer_fixture::SetUp(); - auto consumer = make_consumer(); - _mux_consumer = std::make_unique( - make_consumer(), partition_max_buffered, fetch_max_wait); + + direct_consumer::configuration dc_cfg{ + .with_sessions = fetch_sessions_enabled{ + GetParam() == kafka::client::tests::session_config::with_sessions}}; + + _mux_consumer + = std::make_unique( + *cluster, + cluster_link::replication::mux_remote_consumer::configuration{ + .direct_consumer_configuration = dc_cfg, + .partition_max_buffered = partition_max_buffered, + .fetch_max_wait = fetch_max_wait}); + _mux_consumer->start().get(); // create source and target topics; create_topic(model::topic_namespace_view{_source}).get(); @@ -37,11 +48,12 @@ class ReplicatorFixture : public BasicConsumerFixture { auto [_, partition] = get_leader(_target); vassert(partition, "no partition for {}", _target); - auto source = std::make_unique( + auto source = cluster_link::make_default_data_source( _source.tp, *_mux_consumer); - auto sink = std::make_unique(partition); - _replicator = std::make_unique( - _source, model::term_id{0}, std::move(source), std::move(sink)); + auto sink = cluster_link::make_default_data_sink(partition); + _replicator + = std::make_unique( + _source, model::term_id{0}, std::move(source), std::move(sink)); _replicator->start().get(); } @@ -72,8 +84,10 @@ class ReplicatorFixture : public BasicConsumerFixture { void StopConsumer() override {} protected: - std::unique_ptr _mux_consumer; - std::unique_ptr _replicator; + std::unique_ptr + _mux_consumer; + std::unique_ptr + _replicator; model::ntp _source{model::kafka_namespace, "source", 0}; model::ntp _target{model::kafka_namespace, "target", 0}; }; diff --git a/src/v/redpanda/admin/services/shadow_link/converter.cc b/src/v/redpanda/admin/services/shadow_link/converter.cc index 8e0b7b39b407f..6dc7d7af9b409 100644 --- a/src/v/redpanda/admin/services/shadow_link/converter.cc +++ b/src/v/redpanda/admin/services/shadow_link/converter.cc @@ -434,6 +434,11 @@ create_connection_config(const shadow_link& sl) { config.fetch_max_bytes = client_options.get_fetch_max_bytes(); } + if (client_options.get_fetch_partition_max_bytes() != 0) { + config.fetch_partition_max_bytes + = client_options.get_fetch_partition_max_bytes(); + } + return config; } @@ -583,6 +588,8 @@ create_shadow_link_client_options(const cluster_link::model::metadata& md) { options.set_fetch_wait_max_ms(md.connection.fetch_wait_max_ms.value_or(0)); options.set_fetch_min_bytes(md.connection.fetch_min_bytes.value_or(0)); options.set_fetch_max_bytes(md.connection.fetch_max_bytes.value_or(0)); + options.set_fetch_partition_max_bytes( + md.connection.fetch_partition_max_bytes.value_or(0)); return options; } diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py index 5cac816123be7..4f0df29f03979 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py @@ -15,7 +15,7 @@ from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from google.protobuf import field_mask_pb2 as google_dot_protobuf_dot_field__mask__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n.proto/redpanda/core/admin/v2/shadow_link.proto\x12\x16redpanda.core.admin.v2\x1a"proto/redpanda/pbgen/options.proto\x1a\x1eproto/redpanda/pbgen/rpc.proto\x1a$proto/redpanda/core/common/acl.proto\x1a\x1fgoogle/api/field_behavior.proto\x1a\x1bgoogle/api/field_info.proto\x1a\x19google/api/resource.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a google/protobuf/field_mask.proto"\xc2\x01\n\nShadowLink\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x18\n\x03uid\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12H\n\x0econfigurations\x18\x03 \x01(\x0b20.redpanda.core.admin.v2.ShadowLinkConfigurations\x12=\n\x06status\x18\x04 \x01(\x0b2(.redpanda.core.admin.v2.ShadowLinkStatusB\x03\xe0A\x03"R\n\x17CreateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"S\n\x18CreateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"a\n\x17DeleteShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"\x1a\n\x18DeleteShadowLinkResponse"^\n\x14GetShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"P\n\x15GetShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\x18\n\x16ListShadowLinksRequest"S\n\x17ListShadowLinksResponse\x128\n\x0cshadow_links\x18\x01 \x03(\x0b2".redpanda.core.admin.v2.ShadowLink"\x83\x01\n\x17UpdateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink\x12/\n\x0bupdate_mask\x18\x02 \x01(\x0b2\x1a.google.protobuf.FieldMask"S\n\x18UpdateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"y\n\x0fFailOverRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\x1e\n\x11shadow_topic_name\x18\x02 \x01(\tB\x03\xe0A\x01"K\n\x10FailOverResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\xe7\x02\n\x18ShadowLinkConfigurations\x12G\n\x0eclient_options\x18\x01 \x01(\x0b2/.redpanda.core.admin.v2.ShadowLinkClientOptions\x12U\n\x1btopic_metadata_sync_options\x18\x02 \x01(\x0b20.redpanda.core.admin.v2.TopicMetadataSyncOptions\x12W\n\x1cconsumer_offset_sync_options\x18\x03 \x01(\x0b21.redpanda.core.admin.v2.ConsumerOffsetSyncOptions\x12R\n\x15security_sync_options\x18\x04 \x01(\x0b23.redpanda.core.admin.v2.SecuritySettingsSyncOptions"\xe1\x03\n\x17ShadowLinkClientOptions\x12\x1e\n\x11bootstrap_servers\x18\x01 \x03(\tB\x03\xe0A\x02\x12\x16\n\tclient_id\x18\x02 \x01(\tB\x03\xe0A\x03\x12\x19\n\x11source_cluster_id\x18\x03 \x01(\t\x12>\n\x0ctls_settings\x18\x04 \x01(\x0b2#.redpanda.core.admin.v2.TLSSettingsH\x00\x88\x01\x01\x12^\n\x1cauthentication_configuration\x18\x05 \x01(\x0b23.redpanda.core.admin.v2.AuthenticationConfigurationH\x01\x88\x01\x01\x12\x1b\n\x13metadata_max_age_ms\x18\x06 \x01(\x05\x12\x1d\n\x15connection_timeout_ms\x18\x07 \x01(\x05\x12\x18\n\x10retry_backoff_ms\x18\x08 \x01(\x05\x12\x19\n\x11fetch_wait_max_ms\x18\t \x01(\x05\x12\x17\n\x0ffetch_min_bytes\x18\n \x01(\x05\x12\x17\n\x0ffetch_max_bytes\x18\x0b \x01(\x05B\x0f\n\r_tls_settingsB\x1f\n\x1d_authentication_configuration"\xb8\x01\n\x18TopicMetadataSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12L\n auto_create_shadow_topic_filters\x18\x02 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12!\n\x19shadowed_topic_properties\x18\x03 \x03(\t"\x94\x01\n\x19ConsumerOffsetSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x129\n\rgroup_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter"\x8d\x02\n\x1bSecuritySettingsSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x128\n\x0crole_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12>\n\x12scram_cred_filters\x18\x04 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x126\n\x0bacl_filters\x18\x05 \x03(\x0b2!.redpanda.core.admin.v2.ACLFilter"\xa7\x01\n\x0bTLSSettings\x12D\n\x11tls_file_settings\x18\x01 \x01(\x0b2\'.redpanda.core.admin.v2.TLSFileSettingsH\x00\x12B\n\x10tls_pem_settings\x18\x02 \x01(\x0b2&.redpanda.core.admin.v2.TLSPEMSettingsH\x00B\x0e\n\x0ctls_settings"s\n\x1bAuthenticationConfiguration\x12B\n\x13scram_configuration\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ScramConfigH\x00B\x10\n\x0eauthentication"G\n\x0fTLSFileSettings\x12\x0f\n\x07ca_path\x18\x01 \x01(\t\x12\x10\n\x08key_path\x18\x02 \x01(\t\x12\x11\n\tcert_path\x18\x03 \x01(\t"Z\n\x0eTLSPEMSettings\x12\n\n\x02ca\x18\x01 \x01(\t\x12\x10\n\x03key\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x1c\n\x0fkey_fingerprint\x18\x03 \x01(\tB\x03\xe0A\x03\x12\x0c\n\x04cert\x18\x04 \x01(\t"\xcc\x01\n\x0bScramConfig\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x15\n\x08password\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x19\n\x0cpassword_set\x18\x03 \x01(\x08B\x03\xe0A\x03\x128\n\x0fpassword_set_at\x18\x04 \x01(\x0b2\x1a.google.protobuf.TimestampB\x03\xe0A\x03\x12?\n\x0fscram_mechanism\x18\x05 \x01(\x0e2&.redpanda.core.admin.v2.ScramMechanism"\x8e\x01\n\nNameFilter\x129\n\x0cpattern_type\x18\x01 \x01(\x0e2#.redpanda.core.admin.v2.PatternType\x127\n\x0bfilter_type\x18\x02 \x01(\x0e2".redpanda.core.admin.v2.FilterType\x12\x0c\n\x04name\x18\x03 \x01(\t"\x8f\x01\n\tACLFilter\x12B\n\x0fresource_filter\x18\x01 \x01(\x0b2).redpanda.core.admin.v2.ACLResourceFilter\x12>\n\raccess_filter\x18\x02 \x01(\x0b2\'.redpanda.core.admin.v2.ACLAccessFilter"\x93\x01\n\x11ACLResourceFilter\x128\n\rresource_type\x18\x01 \x01(\x0e2!.redpanda.core.common.ACLResource\x126\n\x0cpattern_type\x18\x02 \x01(\x0e2 .redpanda.core.common.ACLPattern\x12\x0c\n\x04name\x18\x03 \x01(\t"\xab\x01\n\x0fACLAccessFilter\x12\x11\n\tprincipal\x18\x01 \x01(\t\x125\n\toperation\x18\x02 \x01(\x0e2".redpanda.core.common.ACLOperation\x12@\n\x0fpermission_type\x18\x03 \x01(\x0e2\'.redpanda.core.common.ACLPermissionType\x12\x0c\n\x04host\x18\x04 \x01(\t"\xd9\x01\n\x10ShadowLinkStatus\x126\n\x05state\x18\x01 \x01(\x0e2\'.redpanda.core.admin.v2.ShadowLinkState\x12C\n\rtask_statuses\x18\x02 \x03(\x0b2,.redpanda.core.admin.v2.ShadowLinkTaskStatus\x12H\n\x15shadow_topic_statuses\x18\x03 \x03(\x0b2).redpanda.core.admin.v2.ShadowTopicStatus"y\n\x14ShadowLinkTaskStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x120\n\x05state\x18\x02 \x01(\x0e2!.redpanda.core.admin.v2.TaskState\x12\x0e\n\x06reason\x18\x03 \x01(\t\x12\x11\n\tbroker_id\x18\x04 \x01(\x05"\xbe\x01\n\x11ShadowTopicStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x127\n\x05state\x18\x03 \x01(\x0e2(.redpanda.core.admin.v2.ShadowTopicState\x12P\n\x15partition_information\x18\x04 \x03(\x0b21.redpanda.core.admin.v2.TopicPartitionInformation"\x8b\x01\n\x19TopicPartitionInformation\x12\x14\n\x0cpartition_id\x18\x01 \x01(\x03\x12!\n\x19source_last_stable_offset\x18\x02 \x01(\x03\x12\x1d\n\x15source_high_watermark\x18\x03 \x01(\x03\x12\x16\n\x0ehigh_watermark\x18\x04 \x01(\x03*\xb7\x01\n\x0fShadowLinkState\x12!\n\x1dSHADOW_LINK_STATE_UNSPECIFIED\x10\x00\x12\x1c\n\x18SHADOW_LINK_STATE_ACTIVE\x10\x01\x12\x1c\n\x18SHADOW_LINK_STATE_PAUSED\x10\x02\x12"\n\x1eSHADOW_LINK_STATE_FAILING_OVER\x10\x03\x12!\n\x1dSHADOW_LINK_STATE_FAILED_OVER\x10\x04*w\n\x0eScramMechanism\x12\x1f\n\x1bSCRAM_MECHANISM_UNSPECIFIED\x10\x00\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_256\x10\x01\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_512\x10\x02*^\n\x0bPatternType\x12\x1c\n\x18PATTERN_TYPE_UNSPECIFIED\x10\x00\x12\x18\n\x14PATTERN_TYPE_LITERAL\x10\x01\x12\x17\n\x13PATTERN_TYPE_PREFIX\x10\x02*[\n\nFilterType\x12\x1b\n\x17FILTER_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13FILTER_TYPE_INCLUDE\x10\x01\x12\x17\n\x13FILTER_TYPE_EXCLUDE\x10\x02*\xaa\x01\n\tTaskState\x12\x1a\n\x16TASK_STATE_UNSPECIFIED\x10\x00\x12\x15\n\x11TASK_STATE_ACTIVE\x10\x01\x12\x15\n\x11TASK_STATE_PAUSED\x10\x02\x12\x1f\n\x1bTASK_STATE_LINK_UNAVAILABLE\x10\x03\x12\x1a\n\x16TASK_STATE_NOT_RUNNING\x10\x04\x12\x16\n\x12TASK_STATE_FAULTED\x10\x05*\xb5\x01\n\x10ShadowTopicState\x12"\n\x1eSHADOW_TOPIC_STATE_UNSPECIFIED\x10\x00\x12\x1d\n\x19SHADOW_TOPIC_STATE_ACTIVE\x10\x01\x12\x1f\n\x1bSHADOW_TOPIC_STATE_PROMOTED\x10\x02\x12\x1e\n\x1aSHADOW_TOPIC_STATE_FAULTED\x10\x03\x12\x1d\n\x19SHADOW_TOPIC_STATE_PAUSED\x10\x042\xe9\x05\n\x11ShadowLinkService\x12}\n\x10CreateShadowLink\x12/.redpanda.core.admin.v2.CreateShadowLinkRequest\x1a0.redpanda.core.admin.v2.CreateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10DeleteShadowLink\x12/.redpanda.core.admin.v2.DeleteShadowLinkRequest\x1a0.redpanda.core.admin.v2.DeleteShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12t\n\rGetShadowLink\x12,.redpanda.core.admin.v2.GetShadowLinkRequest\x1a-.redpanda.core.admin.v2.GetShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12z\n\x0fListShadowLinks\x12..redpanda.core.admin.v2.ListShadowLinksRequest\x1a/.redpanda.core.admin.v2.ListShadowLinksResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10UpdateShadowLink\x12/.redpanda.core.admin.v2.UpdateShadowLinkRequest\x1a0.redpanda.core.admin.v2.UpdateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12e\n\x08FailOver\x12\'.redpanda.core.admin.v2.FailOverRequest\x1a(.redpanda.core.admin.v2.FailOverResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n.proto/redpanda/core/admin/v2/shadow_link.proto\x12\x16redpanda.core.admin.v2\x1a"proto/redpanda/pbgen/options.proto\x1a\x1eproto/redpanda/pbgen/rpc.proto\x1a$proto/redpanda/core/common/acl.proto\x1a\x1fgoogle/api/field_behavior.proto\x1a\x1bgoogle/api/field_info.proto\x1a\x19google/api/resource.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a google/protobuf/field_mask.proto"\xc2\x01\n\nShadowLink\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x18\n\x03uid\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12H\n\x0econfigurations\x18\x03 \x01(\x0b20.redpanda.core.admin.v2.ShadowLinkConfigurations\x12=\n\x06status\x18\x04 \x01(\x0b2(.redpanda.core.admin.v2.ShadowLinkStatusB\x03\xe0A\x03"R\n\x17CreateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"S\n\x18CreateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"a\n\x17DeleteShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"\x1a\n\x18DeleteShadowLinkResponse"^\n\x14GetShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"P\n\x15GetShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\x18\n\x16ListShadowLinksRequest"S\n\x17ListShadowLinksResponse\x128\n\x0cshadow_links\x18\x01 \x03(\x0b2".redpanda.core.admin.v2.ShadowLink"\x83\x01\n\x17UpdateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink\x12/\n\x0bupdate_mask\x18\x02 \x01(\x0b2\x1a.google.protobuf.FieldMask"S\n\x18UpdateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"y\n\x0fFailOverRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\x1e\n\x11shadow_topic_name\x18\x02 \x01(\tB\x03\xe0A\x01"K\n\x10FailOverResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\xe7\x02\n\x18ShadowLinkConfigurations\x12G\n\x0eclient_options\x18\x01 \x01(\x0b2/.redpanda.core.admin.v2.ShadowLinkClientOptions\x12U\n\x1btopic_metadata_sync_options\x18\x02 \x01(\x0b20.redpanda.core.admin.v2.TopicMetadataSyncOptions\x12W\n\x1cconsumer_offset_sync_options\x18\x03 \x01(\x0b21.redpanda.core.admin.v2.ConsumerOffsetSyncOptions\x12R\n\x15security_sync_options\x18\x04 \x01(\x0b23.redpanda.core.admin.v2.SecuritySettingsSyncOptions"\x84\x04\n\x17ShadowLinkClientOptions\x12\x1e\n\x11bootstrap_servers\x18\x01 \x03(\tB\x03\xe0A\x02\x12\x16\n\tclient_id\x18\x02 \x01(\tB\x03\xe0A\x03\x12\x19\n\x11source_cluster_id\x18\x03 \x01(\t\x12>\n\x0ctls_settings\x18\x04 \x01(\x0b2#.redpanda.core.admin.v2.TLSSettingsH\x00\x88\x01\x01\x12^\n\x1cauthentication_configuration\x18\x05 \x01(\x0b23.redpanda.core.admin.v2.AuthenticationConfigurationH\x01\x88\x01\x01\x12\x1b\n\x13metadata_max_age_ms\x18\x06 \x01(\x05\x12\x1d\n\x15connection_timeout_ms\x18\x07 \x01(\x05\x12\x18\n\x10retry_backoff_ms\x18\x08 \x01(\x05\x12\x19\n\x11fetch_wait_max_ms\x18\t \x01(\x05\x12\x17\n\x0ffetch_min_bytes\x18\n \x01(\x05\x12\x17\n\x0ffetch_max_bytes\x18\x0b \x01(\x05\x12!\n\x19fetch_partition_max_bytes\x18\x0c \x01(\x05B\x0f\n\r_tls_settingsB\x1f\n\x1d_authentication_configuration"\xb8\x01\n\x18TopicMetadataSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12L\n auto_create_shadow_topic_filters\x18\x02 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12!\n\x19shadowed_topic_properties\x18\x03 \x03(\t"\x94\x01\n\x19ConsumerOffsetSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x129\n\rgroup_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter"\x8d\x02\n\x1bSecuritySettingsSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x128\n\x0crole_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12>\n\x12scram_cred_filters\x18\x04 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x126\n\x0bacl_filters\x18\x05 \x03(\x0b2!.redpanda.core.admin.v2.ACLFilter"\xa7\x01\n\x0bTLSSettings\x12D\n\x11tls_file_settings\x18\x01 \x01(\x0b2\'.redpanda.core.admin.v2.TLSFileSettingsH\x00\x12B\n\x10tls_pem_settings\x18\x02 \x01(\x0b2&.redpanda.core.admin.v2.TLSPEMSettingsH\x00B\x0e\n\x0ctls_settings"s\n\x1bAuthenticationConfiguration\x12B\n\x13scram_configuration\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ScramConfigH\x00B\x10\n\x0eauthentication"G\n\x0fTLSFileSettings\x12\x0f\n\x07ca_path\x18\x01 \x01(\t\x12\x10\n\x08key_path\x18\x02 \x01(\t\x12\x11\n\tcert_path\x18\x03 \x01(\t"Z\n\x0eTLSPEMSettings\x12\n\n\x02ca\x18\x01 \x01(\t\x12\x10\n\x03key\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x1c\n\x0fkey_fingerprint\x18\x03 \x01(\tB\x03\xe0A\x03\x12\x0c\n\x04cert\x18\x04 \x01(\t"\xcc\x01\n\x0bScramConfig\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x15\n\x08password\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x19\n\x0cpassword_set\x18\x03 \x01(\x08B\x03\xe0A\x03\x128\n\x0fpassword_set_at\x18\x04 \x01(\x0b2\x1a.google.protobuf.TimestampB\x03\xe0A\x03\x12?\n\x0fscram_mechanism\x18\x05 \x01(\x0e2&.redpanda.core.admin.v2.ScramMechanism"\x8e\x01\n\nNameFilter\x129\n\x0cpattern_type\x18\x01 \x01(\x0e2#.redpanda.core.admin.v2.PatternType\x127\n\x0bfilter_type\x18\x02 \x01(\x0e2".redpanda.core.admin.v2.FilterType\x12\x0c\n\x04name\x18\x03 \x01(\t"\x8f\x01\n\tACLFilter\x12B\n\x0fresource_filter\x18\x01 \x01(\x0b2).redpanda.core.admin.v2.ACLResourceFilter\x12>\n\raccess_filter\x18\x02 \x01(\x0b2\'.redpanda.core.admin.v2.ACLAccessFilter"\x93\x01\n\x11ACLResourceFilter\x128\n\rresource_type\x18\x01 \x01(\x0e2!.redpanda.core.common.ACLResource\x126\n\x0cpattern_type\x18\x02 \x01(\x0e2 .redpanda.core.common.ACLPattern\x12\x0c\n\x04name\x18\x03 \x01(\t"\xab\x01\n\x0fACLAccessFilter\x12\x11\n\tprincipal\x18\x01 \x01(\t\x125\n\toperation\x18\x02 \x01(\x0e2".redpanda.core.common.ACLOperation\x12@\n\x0fpermission_type\x18\x03 \x01(\x0e2\'.redpanda.core.common.ACLPermissionType\x12\x0c\n\x04host\x18\x04 \x01(\t"\xd9\x01\n\x10ShadowLinkStatus\x126\n\x05state\x18\x01 \x01(\x0e2\'.redpanda.core.admin.v2.ShadowLinkState\x12C\n\rtask_statuses\x18\x02 \x03(\x0b2,.redpanda.core.admin.v2.ShadowLinkTaskStatus\x12H\n\x15shadow_topic_statuses\x18\x03 \x03(\x0b2).redpanda.core.admin.v2.ShadowTopicStatus"y\n\x14ShadowLinkTaskStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x120\n\x05state\x18\x02 \x01(\x0e2!.redpanda.core.admin.v2.TaskState\x12\x0e\n\x06reason\x18\x03 \x01(\t\x12\x11\n\tbroker_id\x18\x04 \x01(\x05"\xbe\x01\n\x11ShadowTopicStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x127\n\x05state\x18\x03 \x01(\x0e2(.redpanda.core.admin.v2.ShadowTopicState\x12P\n\x15partition_information\x18\x04 \x03(\x0b21.redpanda.core.admin.v2.TopicPartitionInformation"\x8b\x01\n\x19TopicPartitionInformation\x12\x14\n\x0cpartition_id\x18\x01 \x01(\x03\x12!\n\x19source_last_stable_offset\x18\x02 \x01(\x03\x12\x1d\n\x15source_high_watermark\x18\x03 \x01(\x03\x12\x16\n\x0ehigh_watermark\x18\x04 \x01(\x03*\xb7\x01\n\x0fShadowLinkState\x12!\n\x1dSHADOW_LINK_STATE_UNSPECIFIED\x10\x00\x12\x1c\n\x18SHADOW_LINK_STATE_ACTIVE\x10\x01\x12\x1c\n\x18SHADOW_LINK_STATE_PAUSED\x10\x02\x12"\n\x1eSHADOW_LINK_STATE_FAILING_OVER\x10\x03\x12!\n\x1dSHADOW_LINK_STATE_FAILED_OVER\x10\x04*w\n\x0eScramMechanism\x12\x1f\n\x1bSCRAM_MECHANISM_UNSPECIFIED\x10\x00\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_256\x10\x01\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_512\x10\x02*^\n\x0bPatternType\x12\x1c\n\x18PATTERN_TYPE_UNSPECIFIED\x10\x00\x12\x18\n\x14PATTERN_TYPE_LITERAL\x10\x01\x12\x17\n\x13PATTERN_TYPE_PREFIX\x10\x02*[\n\nFilterType\x12\x1b\n\x17FILTER_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13FILTER_TYPE_INCLUDE\x10\x01\x12\x17\n\x13FILTER_TYPE_EXCLUDE\x10\x02*\xaa\x01\n\tTaskState\x12\x1a\n\x16TASK_STATE_UNSPECIFIED\x10\x00\x12\x15\n\x11TASK_STATE_ACTIVE\x10\x01\x12\x15\n\x11TASK_STATE_PAUSED\x10\x02\x12\x1f\n\x1bTASK_STATE_LINK_UNAVAILABLE\x10\x03\x12\x1a\n\x16TASK_STATE_NOT_RUNNING\x10\x04\x12\x16\n\x12TASK_STATE_FAULTED\x10\x05*\xb5\x01\n\x10ShadowTopicState\x12"\n\x1eSHADOW_TOPIC_STATE_UNSPECIFIED\x10\x00\x12\x1d\n\x19SHADOW_TOPIC_STATE_ACTIVE\x10\x01\x12\x1f\n\x1bSHADOW_TOPIC_STATE_PROMOTED\x10\x02\x12\x1e\n\x1aSHADOW_TOPIC_STATE_FAULTED\x10\x03\x12\x1d\n\x19SHADOW_TOPIC_STATE_PAUSED\x10\x042\xe9\x05\n\x11ShadowLinkService\x12}\n\x10CreateShadowLink\x12/.redpanda.core.admin.v2.CreateShadowLinkRequest\x1a0.redpanda.core.admin.v2.CreateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10DeleteShadowLink\x12/.redpanda.core.admin.v2.DeleteShadowLinkRequest\x1a0.redpanda.core.admin.v2.DeleteShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12t\n\rGetShadowLink\x12,.redpanda.core.admin.v2.GetShadowLinkRequest\x1a-.redpanda.core.admin.v2.GetShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12z\n\x0fListShadowLinks\x12..redpanda.core.admin.v2.ListShadowLinksRequest\x1a/.redpanda.core.admin.v2.ListShadowLinksResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10UpdateShadowLink\x12/.redpanda.core.admin.v2.UpdateShadowLinkRequest\x1a0.redpanda.core.admin.v2.UpdateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12e\n\x08FailOver\x12\'.redpanda.core.admin.v2.FailOverRequest\x1a(.redpanda.core.admin.v2.FailOverResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.v2.shadow_link_pb2', _globals) @@ -62,18 +62,18 @@ _globals['_SHADOWLINKSERVICE'].methods_by_name['UpdateShadowLink']._serialized_options = b'\xea\x92\x19\x02\x10\x03' _globals['_SHADOWLINKSERVICE'].methods_by_name['FailOver']._loaded_options = None _globals['_SHADOWLINKSERVICE'].methods_by_name['FailOver']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_SHADOWLINKSTATE']._serialized_start = 4978 - _globals['_SHADOWLINKSTATE']._serialized_end = 5161 - _globals['_SCRAMMECHANISM']._serialized_start = 5163 - _globals['_SCRAMMECHANISM']._serialized_end = 5282 - _globals['_PATTERNTYPE']._serialized_start = 5284 - _globals['_PATTERNTYPE']._serialized_end = 5378 - _globals['_FILTERTYPE']._serialized_start = 5380 - _globals['_FILTERTYPE']._serialized_end = 5471 - _globals['_TASKSTATE']._serialized_start = 5474 - _globals['_TASKSTATE']._serialized_end = 5644 - _globals['_SHADOWTOPICSTATE']._serialized_start = 5647 - _globals['_SHADOWTOPICSTATE']._serialized_end = 5828 + _globals['_SHADOWLINKSTATE']._serialized_start = 5013 + _globals['_SHADOWLINKSTATE']._serialized_end = 5196 + _globals['_SCRAMMECHANISM']._serialized_start = 5198 + _globals['_SCRAMMECHANISM']._serialized_end = 5317 + _globals['_PATTERNTYPE']._serialized_start = 5319 + _globals['_PATTERNTYPE']._serialized_end = 5413 + _globals['_FILTERTYPE']._serialized_start = 5415 + _globals['_FILTERTYPE']._serialized_end = 5506 + _globals['_TASKSTATE']._serialized_start = 5509 + _globals['_TASKSTATE']._serialized_end = 5679 + _globals['_SHADOWTOPICSTATE']._serialized_start = 5682 + _globals['_SHADOWTOPICSTATE']._serialized_end = 5863 _globals['_SHADOWLINK']._serialized_start = 369 _globals['_SHADOWLINK']._serialized_end = 563 _globals['_CREATESHADOWLINKREQUEST']._serialized_start = 565 @@ -103,38 +103,38 @@ _globals['_SHADOWLINKCONFIGURATIONS']._serialized_start = 1570 _globals['_SHADOWLINKCONFIGURATIONS']._serialized_end = 1929 _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_start = 1932 - _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_end = 2413 - _globals['_TOPICMETADATASYNCOPTIONS']._serialized_start = 2416 - _globals['_TOPICMETADATASYNCOPTIONS']._serialized_end = 2600 - _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_start = 2603 - _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_end = 2751 - _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_start = 2754 - _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_end = 3023 - _globals['_TLSSETTINGS']._serialized_start = 3026 - _globals['_TLSSETTINGS']._serialized_end = 3193 - _globals['_AUTHENTICATIONCONFIGURATION']._serialized_start = 3195 - _globals['_AUTHENTICATIONCONFIGURATION']._serialized_end = 3310 - _globals['_TLSFILESETTINGS']._serialized_start = 3312 - _globals['_TLSFILESETTINGS']._serialized_end = 3383 - _globals['_TLSPEMSETTINGS']._serialized_start = 3385 - _globals['_TLSPEMSETTINGS']._serialized_end = 3475 - _globals['_SCRAMCONFIG']._serialized_start = 3478 - _globals['_SCRAMCONFIG']._serialized_end = 3682 - _globals['_NAMEFILTER']._serialized_start = 3685 - _globals['_NAMEFILTER']._serialized_end = 3827 - _globals['_ACLFILTER']._serialized_start = 3830 - _globals['_ACLFILTER']._serialized_end = 3973 - _globals['_ACLRESOURCEFILTER']._serialized_start = 3976 - _globals['_ACLRESOURCEFILTER']._serialized_end = 4123 - _globals['_ACLACCESSFILTER']._serialized_start = 4126 - _globals['_ACLACCESSFILTER']._serialized_end = 4297 - _globals['_SHADOWLINKSTATUS']._serialized_start = 4300 - _globals['_SHADOWLINKSTATUS']._serialized_end = 4517 - _globals['_SHADOWLINKTASKSTATUS']._serialized_start = 4519 - _globals['_SHADOWLINKTASKSTATUS']._serialized_end = 4640 - _globals['_SHADOWTOPICSTATUS']._serialized_start = 4643 - _globals['_SHADOWTOPICSTATUS']._serialized_end = 4833 - _globals['_TOPICPARTITIONINFORMATION']._serialized_start = 4836 - _globals['_TOPICPARTITIONINFORMATION']._serialized_end = 4975 - _globals['_SHADOWLINKSERVICE']._serialized_start = 5831 - _globals['_SHADOWLINKSERVICE']._serialized_end = 6576 \ No newline at end of file + _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_end = 2448 + _globals['_TOPICMETADATASYNCOPTIONS']._serialized_start = 2451 + _globals['_TOPICMETADATASYNCOPTIONS']._serialized_end = 2635 + _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_start = 2638 + _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_end = 2786 + _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_start = 2789 + _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_end = 3058 + _globals['_TLSSETTINGS']._serialized_start = 3061 + _globals['_TLSSETTINGS']._serialized_end = 3228 + _globals['_AUTHENTICATIONCONFIGURATION']._serialized_start = 3230 + _globals['_AUTHENTICATIONCONFIGURATION']._serialized_end = 3345 + _globals['_TLSFILESETTINGS']._serialized_start = 3347 + _globals['_TLSFILESETTINGS']._serialized_end = 3418 + _globals['_TLSPEMSETTINGS']._serialized_start = 3420 + _globals['_TLSPEMSETTINGS']._serialized_end = 3510 + _globals['_SCRAMCONFIG']._serialized_start = 3513 + _globals['_SCRAMCONFIG']._serialized_end = 3717 + _globals['_NAMEFILTER']._serialized_start = 3720 + _globals['_NAMEFILTER']._serialized_end = 3862 + _globals['_ACLFILTER']._serialized_start = 3865 + _globals['_ACLFILTER']._serialized_end = 4008 + _globals['_ACLRESOURCEFILTER']._serialized_start = 4011 + _globals['_ACLRESOURCEFILTER']._serialized_end = 4158 + _globals['_ACLACCESSFILTER']._serialized_start = 4161 + _globals['_ACLACCESSFILTER']._serialized_end = 4332 + _globals['_SHADOWLINKSTATUS']._serialized_start = 4335 + _globals['_SHADOWLINKSTATUS']._serialized_end = 4552 + _globals['_SHADOWLINKTASKSTATUS']._serialized_start = 4554 + _globals['_SHADOWLINKTASKSTATUS']._serialized_end = 4675 + _globals['_SHADOWTOPICSTATUS']._serialized_start = 4678 + _globals['_SHADOWTOPICSTATUS']._serialized_end = 4868 + _globals['_TOPICPARTITIONINFORMATION']._serialized_start = 4871 + _globals['_TOPICPARTITIONINFORMATION']._serialized_end = 5010 + _globals['_SHADOWLINKSERVICE']._serialized_start = 5866 + _globals['_SHADOWLINKSERVICE']._serialized_end = 6611 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi index d245099fbb430..a27bc7a0e2345 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi @@ -492,6 +492,7 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): FETCH_WAIT_MAX_MS_FIELD_NUMBER: builtins.int FETCH_MIN_BYTES_FIELD_NUMBER: builtins.int FETCH_MAX_BYTES_FIELD_NUMBER: builtins.int + FETCH_PARTITION_MAX_BYTES_FIELD_NUMBER: builtins.int client_id: builtins.str 'The Client ID for the Kafka RPC requests setn by this cluster to the\n source cluster\n ' source_cluster_id: builtins.str @@ -503,11 +504,13 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): retry_backoff_ms: builtins.int 'Retry base backoff\n If 0 is provided, defaults to 100ms\n ' fetch_wait_max_ms: builtins.int - 'Fetch request timeout\n If 0 is provided, defaults to 100ms\n ' + 'Fetch request timeout\n If 0 is provided, defaults to 500ms\n ' fetch_min_bytes: builtins.int - 'Fetch min bytes\n If 0 is provided, defaults to 1 byte\n ' + 'Fetch min bytes\n If 0 is provided, defaults to 5 MiB\n ' fetch_max_bytes: builtins.int - 'Fetch max bytes\n If 0 is provided, defaults to 1MiB\n ' + 'Fetch max bytes\n If 0 is provided, defaults to 20 MiB\n ' + fetch_partition_max_bytes: builtins.int + 'Fetch partition max bytes\n If 0 is provided, defaults to 1 MiB\n ' @property def bootstrap_servers(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: @@ -521,13 +524,13 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): def authentication_configuration(self) -> global___AuthenticationConfiguration: """Authentication settings""" - def __init__(self, *, bootstrap_servers: collections.abc.Iterable[builtins.str] | None=..., client_id: builtins.str=..., source_cluster_id: builtins.str=..., tls_settings: global___TLSSettings | None=..., authentication_configuration: global___AuthenticationConfiguration | None=..., metadata_max_age_ms: builtins.int=..., connection_timeout_ms: builtins.int=..., retry_backoff_ms: builtins.int=..., fetch_wait_max_ms: builtins.int=..., fetch_min_bytes: builtins.int=..., fetch_max_bytes: builtins.int=...) -> None: + def __init__(self, *, bootstrap_servers: collections.abc.Iterable[builtins.str] | None=..., client_id: builtins.str=..., source_cluster_id: builtins.str=..., tls_settings: global___TLSSettings | None=..., authentication_configuration: global___AuthenticationConfiguration | None=..., metadata_max_age_ms: builtins.int=..., connection_timeout_ms: builtins.int=..., retry_backoff_ms: builtins.int=..., fetch_wait_max_ms: builtins.int=..., fetch_min_bytes: builtins.int=..., fetch_max_bytes: builtins.int=..., fetch_partition_max_bytes: builtins.int=...) -> None: ... def HasField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'tls_settings', b'tls_settings']) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'bootstrap_servers', b'bootstrap_servers', 'client_id', b'client_id', 'connection_timeout_ms', b'connection_timeout_ms', 'fetch_max_bytes', b'fetch_max_bytes', 'fetch_min_bytes', b'fetch_min_bytes', 'fetch_wait_max_ms', b'fetch_wait_max_ms', 'metadata_max_age_ms', b'metadata_max_age_ms', 'retry_backoff_ms', b'retry_backoff_ms', 'source_cluster_id', b'source_cluster_id', 'tls_settings', b'tls_settings']) -> None: + def ClearField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'bootstrap_servers', b'bootstrap_servers', 'client_id', b'client_id', 'connection_timeout_ms', b'connection_timeout_ms', 'fetch_max_bytes', b'fetch_max_bytes', 'fetch_min_bytes', b'fetch_min_bytes', 'fetch_partition_max_bytes', b'fetch_partition_max_bytes', 'fetch_wait_max_ms', b'fetch_wait_max_ms', 'metadata_max_age_ms', b'metadata_max_age_ms', 'retry_backoff_ms', b'retry_backoff_ms', 'source_cluster_id', b'source_cluster_id', 'tls_settings', b'tls_settings']) -> None: ... @typing.overload diff --git a/tests/rptest/tests/cluster_linking_e2e_test.py b/tests/rptest/tests/cluster_linking_e2e_test.py index a7f959b389ee5..bd56b3f0f0352 100644 --- a/tests/rptest/tests/cluster_linking_e2e_test.py +++ b/tests/rptest/tests/cluster_linking_e2e_test.py @@ -360,10 +360,17 @@ def _any_topics_are_present_in_target_cluster(): ), ] ) - + shadow_link.configurations.client_options.fetch_wait_max_ms = 100 + shadow_link.configurations.client_options.fetch_min_bytes = 10 + shadow_link.configurations.client_options.fetch_partition_max_bytes = ( + 500 * 1024 * 1024 + ) update_mask: google.protobuf.field_mask_pb2.FieldMask = google.protobuf.field_mask_pb2.FieldMask( paths=[ - "configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters" + "configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters", + "configurations.client_options.fetch_partition_max_bytes", + "configurations.client_options.fetch_wait_max_ms", + "configurations.client_options.fetch_min_bytes", ] ) @@ -377,6 +384,12 @@ def _any_topics_are_present_in_target_cluster(): ), ( f"Expected updated link to be returned, {updated_link.configurations.topic_metadata_sync_options} != {shadow_link.configurations.topic_metadata_sync_options}" ) + assert ( + updated_link.configurations.client_options + == shadow_link.configurations.client_options + ), ( + f"Expected updated link to be returned, {updated_link.configurations.client_options} != {shadow_link.configurations.client_options}" + ) def _all_but_one_topic_are_present_in_target_cluster(): topics_in_target = {t for t in self.target_cluster_rpk.list_topics()}