diff --git a/src/v/cluster_link/BUILD b/src/v/cluster_link/BUILD index 00e0b6a5d350b..6eefeea3fbab5 100644 --- a/src/v/cluster_link/BUILD +++ b/src/v/cluster_link/BUILD @@ -185,6 +185,30 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "shadow_link_report_cache", + srcs = [ + "shadow_link_report_cache.cc", + ], + hdrs = [ + "shadow_link_report_cache.h", + ], + implementation_deps = [ + ":logger", + "//src/v/ssx:future_util", + ], + visibility = ["//visibility:public"], + deps = [ + ":cluster_link", + "//src/v/cluster_link/model", + "//src/v/config", + "//src/v/model", + "//src/v/ssx:semaphore", + "@abseil-cpp//absl/container:flat_hash_map", + "@seastar", + ], +) + redpanda_cc_library( name = "cluster_link", srcs = [ diff --git a/src/v/cluster_link/errc.cc b/src/v/cluster_link/errc.cc index b58ffcab8cfd6..51b95bdff9aa5 100644 --- a/src/v/cluster_link/errc.cc +++ b/src/v/cluster_link/errc.cc @@ -82,6 +82,10 @@ struct error_category final : public std::error_category { return "failed to stop task"; case errc::failed_to_pause_task: return "failed to pause task"; + case errc::report_generation_timed_out: + return "report generation timed out"; + case errc::report_generation_unknown_error: + return "report generation unknown error"; } return "(unknown error code)"; diff --git a/src/v/cluster_link/errc.h b/src/v/cluster_link/errc.h index 54aa59d15aac1..47270407375d2 100644 --- a/src/v/cluster_link/errc.h +++ b/src/v/cluster_link/errc.h @@ -49,6 +49,8 @@ enum class errc : int { link_verification_unknown_error, failed_to_stop_task, failed_to_pause_task, + report_generation_timed_out, + report_generation_unknown_error, }; std::error_code make_error_code(errc) noexcept; diff --git a/src/v/cluster_link/fwd.h b/src/v/cluster_link/fwd.h index df4e927ee9af7..597a514b7e3e8 100644 --- a/src/v/cluster_link/fwd.h +++ b/src/v/cluster_link/fwd.h @@ -15,4 +15,6 @@ namespace cluster_link { class link; class manager; class service; +class shadow_link_report_cache; +class shadow_link_report_fetcher_impl; } // namespace cluster_link diff --git a/src/v/cluster_link/model/BUILD b/src/v/cluster_link/model/BUILD index 543f8d7ea5909..5dfd249410521 100644 --- a/src/v/cluster_link/model/BUILD +++ b/src/v/cluster_link/model/BUILD @@ -29,6 +29,7 @@ redpanda_cc_library( "//src/v/serde:optional", "//src/v/serde:variant", "//src/v/serde:vector", + "//src/v/ssx:async_algorithm", "//src/v/utils:absl_sstring_hash", "//src/v/utils:named_type", "//src/v/utils:unresolved_address", diff --git a/src/v/cluster_link/model/types.cc b/src/v/cluster_link/model/types.cc index 492df18f924f1..6011fd788667c 100644 --- a/src/v/cluster_link/model/types.cc +++ b/src/v/cluster_link/model/types.cc @@ -13,6 +13,7 @@ #include "base/format_to.h" #include "model/timestamp.h" +#include "ssx/async_algorithm.h" #include "utils/to_string.h" #include @@ -337,6 +338,20 @@ shadow_link_status_report_request::format_to(fmt::iterator it) const { return fmt::format_to(it, "{{ link_id: {} }}", link_id); } +ss::future +shadow_link_status_topic_response::copy() const { + shadow_link_status_topic_response copy; + copy.status = status; + copy.partition_reports.reserve(partition_reports.size()); + ssx::async_counter counter; + co_await ssx::async_for_each_counter( + counter, partition_reports, [©](const auto& pid_report) { + const auto& [pid, report] = pid_report; + copy.partition_reports.emplace(pid, report); + }); + co_return copy; +} + fmt::iterator shadow_link_status_topic_response::format_to(fmt::iterator it) const { return fmt::format_to( @@ -376,6 +391,23 @@ shadow_link_status_report_response::format_to(fmt::iterator it) const { } // namespace cluster_link::rpc namespace cluster_link::model { + +ss::future shadow_link_status_report::copy() const { + shadow_link_status_report copy; + copy.link_id = link_id; + + copy.topic_responses.reserve(topic_responses.size()); + for (const auto& [topic, response] : topic_responses) { + copy.topic_responses.emplace(topic, co_await response.copy()); + } + + copy.task_status_reports.reserve(task_status_reports.size()); + for (const auto& [task_name, reports] : task_status_reports) { + copy.task_status_reports.emplace(task_name, reports.copy()); + } + co_return copy; +} + fmt::iterator shadow_link_status_report::format_to(fmt::iterator it) const { return fmt::format_to( it, diff --git a/src/v/cluster_link/model/types.h b/src/v/cluster_link/model/types.h index 8510dae8d06ef..00db00bf7a088 100644 --- a/src/v/cluster_link/model/types.h +++ b/src/v/cluster_link/model/types.h @@ -1193,6 +1193,8 @@ struct shadow_link_status_topic_response fmt::iterator format_to(fmt::iterator) const; + ss::future copy() const; + auto serde_fields() { return std::tie(status, partition_reports); } }; @@ -1234,6 +1236,8 @@ struct shadow_link_status_report { chunked_hash_map> task_status_reports; + ss::future copy() const; + fmt::iterator format_to(fmt::iterator) const; }; diff --git a/src/v/cluster_link/service.h b/src/v/cluster_link/service.h index 06044fa9b37e3..df0b87fae0bc8 100644 --- a/src/v/cluster_link/service.h +++ b/src/v/cluster_link/service.h @@ -171,6 +171,8 @@ class service : public ss::peering_sharded_service { ss::future shadow_link_report(model::name_t name); + const manager& get_manager() const { return *_manager; } + private: void register_notifications(); void unregister_notifications(); diff --git a/src/v/cluster_link/shadow_link_report_cache.cc b/src/v/cluster_link/shadow_link_report_cache.cc new file mode 100644 index 0000000000000..a29df527e305e --- /dev/null +++ b/src/v/cluster_link/shadow_link_report_cache.cc @@ -0,0 +1,281 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster_link/shadow_link_report_cache.h" + +#include "cluster_link/logger.h" +#include "cluster_link/service.h" +#include "ssx/future-util.h" + +#include +#include + +namespace cluster_link { + +ss::future +default_shadow_link_report_fetcher::fetch_link_report( + const model::name_t link_name, ss::abort_source&) { + return _shadow_service->local().shadow_link_report(link_name); +} + +shadow_link_report_cache::shadow_link_report_cache( + config::binding cache_ttl, + std::unique_ptr fetcher) + : _cache_ttl(std::move(cache_ttl)) + , _link_status_fetcher(std::move(fetcher)) { + if (ss::this_shard_id() != fetcher_shard) { + return; + } + _invalidator.set_callback([this]() { + ssx::spawn_with_gate( + _gate, [this] { return invalidate_stale_reports(); }); + }); +} + +ss::future<> shadow_link_report_cache::start() { + // Only start the refresh timer on shard 0 + if (ss::this_shard_id() == fetcher_shard) { + vlog( + cllog.debug, + "Starting shadow topic report cache with TTL: {}", + _cache_ttl()); + _invalidator.arm(_cache_ttl()); + } + co_return; +} + +ss::future<> shadow_link_report_cache::stop() { + vlog(cllog.debug, "Stopping shadow link report cache"); + _invalidator.cancel(); + _cached_cv.broken(); + _as.request_abort(); + co_await _gate.close(); + vlog(cllog.debug, "Stopped shadow link report cache"); +} + +ss::future<> shadow_link_report_cache::invalidate_stale_reports() noexcept { + vassert( + ss::this_shard_id() == fetcher_shard, + "invalidate_stale_reports must be called on fetcher shard"); + const auto now = cache_clock::now(); + const auto ttl = _cache_ttl(); + + chunked_vector to_invalidate; + to_invalidate.reserve(_cache.size()); + // collect a list of stale reports to invalidate + auto next_scheduled_time = now + ttl; + for (auto& [link_id, cached_report] : _cache) { + auto age = now - cached_report.last_refresh_time; + if (age >= ttl) { + to_invalidate.push_back(link_id); + } else { + next_scheduled_time = std::min( + next_scheduled_time, now + (_cache_ttl() - age)); + } + } + if (!to_invalidate.empty()) { + vlog( + cllog.debug, + "Invalidating stale shadow link reports: {}", + to_invalidate); + co_await container().invoke_on_all( + [&to_invalidate](auto& local) mutable { + for (const auto& link_id : to_invalidate) { + local.invalidate_link_report(link_id); + } + }); + } + if (_gate.is_closed()) { + co_return; + } + // reschedule invalidation timer + _invalidator.arm(next_scheduled_time); +} + +bool shadow_link_report_cache::has_valid_cached_report( + const model::name_t& link_name) const { + auto it = _cache.find(link_name); + if (it == _cache.end()) { + return false; + } + const auto& cached_report = it->second; + const auto now = cache_clock::now(); + const auto age = now - cached_report.last_refresh_time; + return age <= _cache_ttl() && cached_report.report != nullptr; +} + +void shadow_link_report_cache::maybe_dispatch_report_fetcher( + const model::name_t& name, std::chrono::milliseconds backoff) noexcept { + if (ss::this_shard_id() != fetcher_shard) { + ssx::spawn_with_gate(_gate, [this, name, backoff] { + return this->container().invoke_on( + fetcher_shard, [name, backoff](auto& local) { + local.maybe_dispatch_report_fetcher(name, backoff); + }); + }); + return; + } + if (_gate.is_closed() || _in_flight_fetches.contains(name)) { + return; + } + _in_flight_fetches.insert(name); + ssx::spawn_with_gate(_gate, [this, name, backoff]() { + return do_dispatch_report_fetcher(name, backoff); + }); +} + +ss::future<> shadow_link_report_cache::do_dispatch_report_fetcher( + model::name_t link_id, std::chrono::milliseconds backoff) { + // Expected to be called on a fetcher shard and under a gate. + vassert(_in_flight_fetches.contains(link_id), "Fetch not marked in flight"); + vassert( + ss::this_shard_id() == fetcher_shard, + "do_dispatch_report_fetcher must be called on fetcher shard"); + auto failed = true; + vlog( + cllog.trace, + "Dispatching shadow link report fetcher for link {} with backoff {}", + link_id, + backoff); + try { + // prev fetch request failed, backoff before retrying + co_await ss::sleep_abortable(backoff, _as); + auto report = co_await _link_status_fetcher->fetch_link_report( + link_id, _as); + if (report.has_value()) { + // we have a new report, dispatch it to all shards. + auto report_data = std::move(report.value()); + co_await container().invoke_on_all( + [link_id, &report_data](auto& local) mutable { + return report_data.copy().then( + [&local, link_id](auto report) mutable { + local.update_local_report(link_id, std::move(report)); + }); + }); + failed = false; + } else if (!_gate.is_closed()) { + vlog( + cllog.warn, + "Could not fetch shadow link report for link {}: err: {}", + link_id, + report.error()); + // treat as failure to trigger retry logic + } + } catch (...) { + auto eptr = std::current_exception(); + auto log_level = ssx::is_shutdown_exception(eptr) ? ss::log_level::debug + : ss::log_level::warn; + vlogl( + cllog, + log_level, + "Failed to fetch shadow link report for link {}: {}", + link_id, + eptr); + } + _in_flight_fetches.erase(link_id); + // schedule another fetch if needed + // sometimes there is a competing force refresh that may have updated the + // cache while we were fetching, in which case we will schedule a new fetch + // without backoff, right away. + auto schedule_another = (failed || !has_valid_cached_report(link_id)) + && !_gate.is_closed(); + if (schedule_another) { + auto next_backoff = no_backoff; + if (failed) { + next_backoff = std::min( + std::max(backoff * 2, min_backoff), max_backoff); + } + maybe_dispatch_report_fetcher(link_id, next_backoff); + } + _cached_cv.broadcast(); +} + +void shadow_link_report_cache::update_local_report( + const model::name_t& link_name, + std::optional report) noexcept { + if (_gate.is_closed()) { + return; + } + vlog( + cllog.trace, + "Updating local shadow link report for link {}: {}", + link_name, + report); + _cache.erase(link_name); + if (!report.has_value()) { + return; + } + _cache.emplace(link_name, cached_link_report{std::move(report.value())}); + _cached_cv.broadcast(); +} + +void shadow_link_report_cache::invalidate_link_report( + const model::name_t& link_name) { + update_local_report(link_name, std::nullopt); +} + +ss::future +shadow_link_report_cache::get_report( + const model::name_t& link_name, + std::chrono::milliseconds timeout, + bool force_refresh) { + auto holder = _gate.hold(); + if (force_refresh) { + invalidate_link_report(link_name); + } + if (has_valid_cached_report(link_name)) { + auto report_it = _cache.find(link_name); + return ssx::now(report_it->second.report); + } + maybe_dispatch_report_fetcher(link_name, no_backoff); + return _cached_cv + .wait( + timeout, + [this, link_name]() { return has_valid_cached_report(link_name); }) + .then([this, link_name] { + auto report_it = _cache.find(link_name); + if (report_it != _cache.end()) { + return ssx::now(report_it->second.report); + } + return ssx::now( + std::unexpected(errc::report_generation_unknown_error)); + }) + .handle_exception_type( + [link_name](const ss::condition_variable_timed_out&) { + vlog( + cllog.warn, + "Timeout waiting for shadow link report fetch for link {}", + link_name); + return ssx::now( + std::unexpected(errc::report_generation_timed_out)); + }) + .handle_exception_type([link_name](const ss::broken_condition_variable&) { + vlog( + cllog.debug, + "Shadow link report fetch aborted for link {} due to cache " + "shutdown", + link_name); + return ssx::now( + std::unexpected(errc::service_shutting_down)); + }) + .handle_exception_type([link_name](const ss::gate_closed_exception&) { + vlog( + cllog.debug, + "Shadow link report fetch aborted for link {} due to cache " + "shutdown", + link_name); + return ssx::now( + std::unexpected(errc::service_shutting_down)); + }) + .finally([holder = std::move(holder)] {}); +} + +} // namespace cluster_link diff --git a/src/v/cluster_link/shadow_link_report_cache.h b/src/v/cluster_link/shadow_link_report_cache.h new file mode 100644 index 0000000000000..749a8d32b8804 --- /dev/null +++ b/src/v/cluster_link/shadow_link_report_cache.h @@ -0,0 +1,164 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "absl/container/flat_hash_map.h" +#include "cluster_link/model/types.h" +#include "config/property.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace cluster_link { + +class service; + +using link_status_ptr + = ss::lw_shared_ptr; + +using cache_clock = ss::lowres_clock; + +/** + * Interface for fetching shadow link reports. + * This abstraction allows easy testing and pluggable report generation logic. + */ +class shadow_link_report_fetcher { +public: + virtual ~shadow_link_report_fetcher() = default; + + /** + * Fetch a shadow_link_status_report for a given link. + * This should aggregate reports from all shards on the local node. + * + * @param link_id The link to fetch the report for + * @return A future that resolves to the shadow_link_status_report_response + */ + virtual ss::future + fetch_link_report(model::name_t link_id, ss::abort_source&) = 0; +}; + +class default_shadow_link_report_fetcher : public shadow_link_report_fetcher { +public: + explicit default_shadow_link_report_fetcher( + ss::sharded* shadow_service) + : _shadow_service(shadow_service) {} + ss::future + fetch_link_report(model::name_t link_name, ss::abort_source& as) override; + +private: + ss::sharded* _shadow_service; +}; + +/** + * Per-link cached report data. + * Uses copy-on-write semantics with lw_shared_ptr for efficient sharing. + */ +struct cached_link_report { + // The cached report (immutable once set) + link_status_ptr report; + + // When this report was last refreshed + ss::lowres_clock::time_point last_refresh_time; + + explicit cached_link_report(model::shadow_link_status_report r) + : report( + ss::make_lw_shared( + std::move(r))) + , last_refresh_time(cache_clock::now()) {} +}; + +/** + * Read-through cache for shadow topic reports. + * + * Key design features: + * 1. Invalidates cache every `cache_ttl` seconds (cluster config) + * 2. Invalidation runs on shard 0, results copied to all shards + * 3. Copy-on-write with ss::lw_shared_ptr for efficient sharing + * 4. Organized by link (one report per link) + * 5. Supports get_report(link_id, topic) with optional force_refresh + * 6. Pluggable report fetcher interface for testing + */ +class shadow_link_report_cache + : public ss::peering_sharded_service { +public: + shadow_link_report_cache( + config::binding cache_ttl, + std::unique_ptr fetcher); + + /** + * Start the cache. If running on shard 0, starts the periodic + * invalidation timer. + */ + ss::future<> start(); + + /** + * Stop the cache and clean up resources. + */ + ss::future<> stop(); + + /** + * Get a report for a specific link and topic. + * + * @param link_id The link to get the report for + * @param topic The topic to extract from the report + * @param force_refresh If true, bypass cache and force a fresh fetch + * @return A future that resolves to the topic's report, or nullptr if not + * found + */ + using report_return_t = std::expected; + ss::future get_report( + const model::name_t& link_name, + std::chrono::milliseconds timeout, + bool force_refresh = false); + +private: + static constexpr auto fetcher_shard = ss::shard_id{0}; + static constexpr auto no_backoff = 0ms; + static constexpr auto initial_backoff = 100ms; + static constexpr auto min_backoff = 100ms; + static constexpr auto max_backoff = 10000ms; // 10s + + bool has_valid_cached_report(const model::name_t& link_name) const; + void invalidate_link_report(const model::name_t& link_nam); + void maybe_dispatch_report_fetcher( + const model::name_t& link_name, + std::chrono::milliseconds backoff) noexcept; + ss::future<> do_dispatch_report_fetcher( + model::name_t link_name, std::chrono::milliseconds backoff); + ss::future<> invalidate_stale_reports() noexcept; + void update_local_report( + const model::name_t& link_name, + std::optional) noexcept; + + config::binding _cache_ttl; + std::unique_ptr _link_status_fetcher; + // Periodic refresh timer (only active on shard 0) + ss::timer _invalidator; + // Per-link cached reports + // Key: link_id, Value: cached report with metadata + absl::flat_hash_map _cache; + absl::flat_hash_set _in_flight_fetches; + ss::condition_variable _cached_cv; + ss::gate _gate; + ss::abort_source _as; +}; + +} // namespace cluster_link diff --git a/src/v/cluster_link/tests/BUILD b/src/v/cluster_link/tests/BUILD index ae60b4210953c..d3c082a053c34 100644 --- a/src/v/cluster_link/tests/BUILD +++ b/src/v/cluster_link/tests/BUILD @@ -188,3 +188,23 @@ redpanda_cc_gtest( "@seastar//:testing", ], ) + +redpanda_cc_gtest( + name = "shadow_link_report_cache_test", + timeout = "short", + srcs = [ + "shadow_link_report_cache_test.cc", + ], + cpu = 2, + deps = [ + "//src/v/cluster_link:shadow_link_report_cache", + "//src/v/cluster_link/model", + "//src/v/config", + "//src/v/model", + "//src/v/test_utils:gtest", + "//src/v/test_utils:random", + "@googletest//:gtest", + "@seastar", + "@seastar//:testing", + ], +) diff --git a/src/v/cluster_link/tests/shadow_link_report_cache_test.cc b/src/v/cluster_link/tests/shadow_link_report_cache_test.cc new file mode 100644 index 0000000000000..ff42bdf2f1973 --- /dev/null +++ b/src/v/cluster_link/tests/shadow_link_report_cache_test.cc @@ -0,0 +1,489 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster_link/model/types.h" +#include "cluster_link/shadow_link_report_cache.h" +#include "config/mock_property.h" +#include "config/property.h" +#include "model/fundamental.h" +#include "test_utils/randoms.h" +#include "test_utils/test.h" + +#include +#include +#include +#include + +#include + +#include + +using namespace cluster_link; +using namespace std::chrono_literals; + +namespace { + +// Mock fetcher for testing that simulates delays +class mock_report_fetcher : public shadow_link_report_fetcher { +public: + // Track how many times fetch was called + int fetch_count{0}; + + // Control whether fetches succeed or fail + bool should_fail{false}; + + // Configurable delay to simulate slow fetches + std::chrono::milliseconds fetch_delay{0ms}; + + // Last link_name that was fetched + cluster_link::model::name_t last_fetched_link_name{""}; + + ss::future fetch_link_report( + cluster_link::model::name_t link_name, ss::abort_source& as) override { + fetch_count++; + last_fetched_link_name = link_name; + + // Simulate fetch delay + if (fetch_delay > 0ms) { + co_await ss::sleep_abortable(fetch_delay, as); + } + + if (should_fail) { + co_return std::unexpected( + cluster_link::errc::failed_to_connect_to_remote_cluster); + } + + // Create a fake report with some data + cluster_link::model::shadow_link_status_report response; + // Note: link_id is still id_t in the report structure, but we receive + // name_t For testing purposes, use a numeric ID derived from the name + response.link_id = cluster_link::model::id_t{ + static_cast(std::hash{}(link_name()))}; + + // Add a fake topic with partition reports + ::model::topic test_topic{ssx::sformat("test_topic_{}", link_name())}; + cluster_link::rpc::shadow_link_status_topic_response topic_response; + topic_response.status + = cluster_link::model::mirror_topic_status::active; + + // Add a fake partition report + cluster_link::rpc::shadow_topic_partition_leader_report + partition_report; + partition_report.partition = ::model::partition_id{0}; + partition_report.source_partition_start_offset = kafka::offset{0}; + partition_report.source_partition_high_watermark = kafka::offset{100}; + partition_report.source_partition_last_stable_offset = kafka::offset{ + 100}; + partition_report.shadow_partition_high_watermark = kafka::offset{90}; + partition_report.last_update_time = std::chrono::milliseconds{ + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count()}; + + topic_response.partition_reports[::model::partition_id{0}] + = partition_report; + + response.topic_responses.emplace(test_topic, std::move(topic_response)); + + co_return response; + } +}; + +// Test fixture for shadow_link_report_cache tests +class shadow_topic_report_cache_test : public seastar_test { +protected: + ss::future<> SetUpAsync() override { + vassert(ss::smp::count > 1, "Tests require multiple shards"); + // Create mock fetcher with configurable delay + // Create cache with 1 hour TTL (long enough to avoid expiration during + // tests). Pass the fetcher only on shard 0, other shards get nullptr. + co_await ttl_config.start(std::chrono::milliseconds(1h)); + co_await cache.start( + ss::sharded_parameter([this] { return ttl_config.local().bind(); }), + ss::sharded_parameter( + [this]() -> std::unique_ptr { + // Only shard 0 gets the fetcher, other shards get nullptr + if (ss::this_shard_id() == 0) { + auto fetcher = std::make_unique(); + fetcher_ptr = fetcher.get(); + return fetcher; + } + return std::unique_ptr{}; + })); + // Start cache on all shards + co_await cache.invoke_on_all(&shadow_link_report_cache::start); + } + + ss::future<> TearDownAsync() override { + co_await cache.stop(); + co_await ttl_config.stop(); + } + + ss::sharded> ttl_config; + + ss::sharded cache; + std::unique_ptr fetcher; + mock_report_fetcher* fetcher_ptr{nullptr}; +}; + +} // anonymous namespace + +TEST_F_CORO(shadow_topic_report_cache_test, single_shard_fetch) { + // Configure fetcher with a small delay to simulate fetching + fetcher_ptr->fetch_delay = 50ms; + + cluster_link::model::name_t link_name{"test_link_42"}; + + // Measure time before fetch + auto start_time = std::chrono::steady_clock::now(); + + // First get should trigger a fetch + auto report_result = co_await cache.local().get_report( + link_name, 5s, false); + + auto elapsed_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + // Verify fetch was called + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + EXPECT_EQ(fetcher_ptr->last_fetched_link_name, link_name); + + // Verify report is valid + ASSERT_TRUE_CORO(report_result.has_value()); + auto report = report_result.value(); + EXPECT_TRUE(report); + if (report) { + EXPECT_FALSE(report->topic_responses.empty()); + } + + // Verify fetch delay was respected (should be at least 50ms) + EXPECT_GE(elapsed_time.count(), 50); + + // Second get should use cache (no additional delay) + start_time = std::chrono::steady_clock::now(); + auto report2_result = co_await cache.local().get_report( + link_name, 5s, false); + elapsed_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + // Verify no additional fetch + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + + // Verify cached response is fast (should be much less than 50ms) + EXPECT_LT(elapsed_time.count(), 20); + + // Verify we got the same cached report + ASSERT_TRUE_CORO(report2_result.has_value()) + << "Second report fetch failed"; + auto report2 = report2_result.value(); + ASSERT_TRUE_CORO(report2); + EXPECT_EQ(report.get(), report2.get()); // Same pointer = COW working +} + +TEST_F_CORO(shadow_topic_report_cache_test, concurrent_fetches) { + // Configure fetcher with delay to simulate fetching + fetcher_ptr->fetch_delay = 100ms; + + // Create multiple link names to fetch concurrently + cluster_link::model::name_t link1{"link_1"}; + cluster_link::model::name_t link2{"link_2"}; + cluster_link::model::name_t link3{"link_3"}; + + auto start_time = std::chrono::steady_clock::now(); + + // Issue concurrent fetches for different links + auto f1 = cache.local().get_report(link1, 5s, false); + auto f2 = cache.local().get_report(link2, 5s, false); + auto f3 = cache.local().get_report(link3, 5s, false); + + auto [result1, result2, result3] = co_await ss::when_all_succeed( + std::move(f1), std::move(f2), std::move(f3)); + + auto elapsed_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + // Verify all fetches completed + EXPECT_EQ(fetcher_ptr->fetch_count, 3); + + // Verify all reports are valid + ASSERT_TRUE_CORO(result1.has_value()); + ASSERT_TRUE_CORO(result2.has_value()); + ASSERT_TRUE_CORO(result3.has_value()); + + auto report1 = result1.value(); + auto report2 = result2.value(); + auto report3 = result3.value(); + + ASSERT_TRUE_CORO(report1); + ASSERT_TRUE_CORO(report2); + ASSERT_TRUE_CORO(report3); + + // Test concurrent requests for the SAME link (deduplication) + fetcher_ptr->fetch_count = 0; + cluster_link::model::name_t link4{"link_4"}; + + start_time = std::chrono::steady_clock::now(); + + // Issue multiple concurrent requests for the same link + auto f4_1 = cache.local().get_report(link4, 5s, false); + auto f4_2 = cache.local().get_report(link4, 5s, false); + auto f4_3 = cache.local().get_report(link4, 5s, false); + + auto [result4_1, result4_2, result4_3] = co_await ss::when_all_succeed( + std::move(f4_1), std::move(f4_2), std::move(f4_3)); + + elapsed_time = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + // All should get the same report + ASSERT_TRUE_CORO(result4_1.has_value()) << "report4_1 failed"; + ASSERT_TRUE_CORO(result4_2.has_value()) << "report4_2 failed"; + ASSERT_TRUE_CORO(result4_3.has_value()) << "report4_3 failed"; + + auto report4_1 = result4_1.value(); + auto report4_2 = result4_2.value(); + auto report4_3 = result4_3.value(); + + ASSERT_TRUE_CORO(report4_1); + ASSERT_TRUE_CORO(report4_2); + ASSERT_TRUE_CORO(report4_3); + + // Should only fetch once due to deduplication + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + + // Time should be ~100ms (single fetch), not 300ms + EXPECT_LT(elapsed_time.count(), 150); +} + +TEST_F_CORO(shadow_topic_report_cache_test, force_refresh) { + fetcher_ptr->fetch_delay = 30ms; + + cluster_link::model::name_t link_name{"test_link_force_refresh"}; + + // First get + auto result1 = co_await cache.local().get_report(link_name, 5s, false); + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + ASSERT_TRUE_CORO(result1.has_value()); + auto report1 = result1.value(); + ASSERT_TRUE_CORO(report1); + + // Force refresh should trigger a new fetch + auto result2 = co_await cache.local().get_report(link_name, 5s, true); + EXPECT_EQ(fetcher_ptr->fetch_count, 2); + ASSERT_TRUE_CORO(result2.has_value()); + auto report2 = result2.value(); + ASSERT_TRUE_CORO(report2); + + // Reports should be different instances (new fetch) + EXPECT_NE(report1.get(), report2.get()); +} + +TEST_F_CORO(shadow_topic_report_cache_test, fetch_timeout) { + // Configure a very long delay that will exceed timeout + fetcher_ptr->fetch_delay = 10s; + + cluster_link::model::name_t link_name{"test_link_timeout"}; + + // Try to get with a short timeout - should return error + auto result = co_await cache.local().get_report(link_name, 500ms, false); + EXPECT_FALSE(result.has_value()); + if (!result.has_value()) { + EXPECT_EQ( + result.error(), cluster_link::errc::report_generation_timed_out); + } +} + +TEST_F_CORO(shadow_topic_report_cache_test, fetch_error) { + fetcher_ptr->should_fail = true; + fetcher_ptr->fetch_delay = 50ms; + + cluster_link::model::name_t link_name{"test_link_error"}; + + // First attempt should fail and start retries + auto result = co_await cache.local().get_report(link_name, 200ms, false); + + // Should return error since fetch failed + EXPECT_FALSE(result.has_value()); + + // Verify fetch was attempted (may retry during timeout window) + EXPECT_GT(fetcher_ptr->fetch_count, 0); +} + +TEST_F_CORO(shadow_topic_report_cache_test, cached_response_performance) { + fetcher_ptr->fetch_delay = 100ms; + + cluster_link::model::name_t link_name{"test_link_perf"}; + + // First fetch - should take ~100ms + auto start = std::chrono::steady_clock::now(); + auto result1 = co_await cache.local().get_report(link_name, 5s, false); + auto duration1 = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + + EXPECT_GE(duration1.count(), 100); + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + ASSERT_TRUE_CORO(result1.has_value()); + auto report1 = result1.value(); + + // Second fetch - should be cached and fast + start = std::chrono::steady_clock::now(); + auto result2 = co_await cache.local().get_report(link_name, 5s, false); + auto duration2 = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + + EXPECT_LT(duration2.count(), 20); + EXPECT_EQ(fetcher_ptr->fetch_count, 1); // No additional fetch + ASSERT_TRUE_CORO(result2.has_value()); + auto report2 = result2.value(); + EXPECT_EQ(report1.get(), report2.get()); // Same cached object +} + +TEST_F_CORO(shadow_topic_report_cache_test, multiple_links_independent_caches) { + fetcher_ptr->fetch_delay = 50ms; + + cluster_link::model::name_t link1{"link_alpha"}; + cluster_link::model::name_t link2{"link_beta"}; + + // Fetch for link1 + auto result1 = co_await cache.local().get_report(link1, 5s, false); + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + ASSERT_TRUE_CORO(result1.has_value()); + auto report1 = result1.value(); + ASSERT_TRUE_CORO(report1); + + // Fetch for link2 - should trigger new fetch + auto result2 = co_await cache.local().get_report(link2, 5s, false); + EXPECT_EQ(fetcher_ptr->fetch_count, 2); + ASSERT_TRUE_CORO(result2.has_value()); + auto report2 = result2.value(); + ASSERT_TRUE_CORO(report2); + + // Fetch link1 again - should use cache + auto result1_cached = co_await cache.local().get_report(link1, 5s, false); + EXPECT_EQ(fetcher_ptr->fetch_count, 2); // No new fetch + ASSERT_TRUE_CORO(result1_cached.has_value()); + auto report1_cached = result1_cached.value(); + EXPECT_EQ(report1.get(), report1_cached.get()); + + // Fetch link2 again - should use cache + auto result2_cached = co_await cache.local().get_report(link2, 5s, false); + EXPECT_EQ(fetcher_ptr->fetch_count, 2); // No new fetch + ASSERT_TRUE_CORO(result2_cached.has_value()); + auto report2_cached = result2_cached.value(); + EXPECT_EQ(report2.get(), report2_cached.get()); +} + +TEST_F_CORO(shadow_topic_report_cache_test, multi_shard_fetch) { + fetcher_ptr->fetch_delay = 50ms; + cluster_link::model::name_t link_name{"test_link_multishard"}; + + static constexpr auto other_shard = ss::shard_id{1}; + // issue a fetch on shard 1, should be routed to shard0 + co_await cache.invoke_on(other_shard, [link_name](auto& other) { + return other.get_report(link_name, 5s, false).then([](auto result) { + // Verify report is valid + EXPECT_TRUE(result.has_value()); + auto& report = result.value(); + + EXPECT_TRUE(report); + if (report) { + EXPECT_FALSE(report->topic_responses.empty()); + } + return ss::now(); + }); + }); + + // Verify fetch was called + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + EXPECT_EQ(fetcher_ptr->last_fetched_link_name, link_name); + // Second get on shard 0 / 1 should use cache (no additional delay) + auto result2 = co_await cache.local().get_report(link_name, 5s, false); + // Verify no additional fetch + EXPECT_EQ(fetcher_ptr->fetch_count, 1); + // Verify we got the same cached report + ASSERT_TRUE_CORO(result2.has_value()) << "Second report fetch failed"; + auto report2 = result2.value(); + ASSERT_TRUE_CORO(report2); +} + +TEST_F_CORO(shadow_topic_report_cache_test, multi_shard_fuzz) { + // Configure fetcher with random delays + fetcher_ptr->fetch_delay = 10ms; + + // Test parameters + static constexpr auto test_duration = 5s; + static constexpr auto num_links = 5; + constexpr auto timeout_ms = 2s; + + // Random number generator + auto start_time = std::chrono::steady_clock::now(); + // Track statistics + size_t successful_gets{0}; + size_t failed_gets{0}; + size_t total_operations{0}; + + ss::gate g; + + // Run fuzz test for 5 seconds + while (std::chrono::steady_clock::now() - start_time < test_duration) { + // Randomize parameters + auto link_name = cluster_link::model::name_t{ssx::sformat( + "fuzz_link_{}", random_generators::get_int(0, num_links))}; + auto target_shard = tests::random_bool() ? ss::shard_id{0} + : ss::shard_id{1}; + bool force_refresh = tests::random_bool(); // 10% chance + auto operation_delay = std::chrono::milliseconds{ + random_generators::get_int(0, 100)}; + + total_operations++; + + // Introduce random delay before operation + if (operation_delay > 0ms) { + co_await ss::sleep(operation_delay); + } + + // Launch operation on random shard + (void)cache + .invoke_on( + target_shard, + [link_name, force_refresh, timeout_ms](auto& cache_instance) { + return cache_instance + .get_report(link_name, timeout_ms, force_refresh) + .then([](auto result) { return result.has_value(); }); + }) + .then([&successful_gets, &failed_gets](bool was_success) { + if (was_success) { + successful_gets++; + } else { + failed_gets++; + } + }) + .finally([holder = g.hold()] {}); + + // Occasionally randomize the fetcher delay + if (random_generators::get_int(0, 100) >= 80) { + auto delay = std::chrono::milliseconds{ + random_generators::get_int(0, 200)}; + fetcher_ptr->fetch_delay = std::chrono::milliseconds{delay}; + } + } + + // Wait for all operations to complete + co_await g.close(); + // Verify test ran successfully + auto total_gets = successful_gets + failed_gets; + EXPECT_GT(total_gets, 0) << "No operations completed"; + EXPECT_GE(total_operations, total_gets); + // Most operations should succeed (some may timeout due to random delays) + auto success_rate = static_cast(successful_gets) / total_gets; + EXPECT_GT(success_rate, 0.5) + << "Success rate too low: " << (success_rate * 100) << "%"; +} diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 866f2bb9f21fd..683141fc485af 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -4495,6 +4495,14 @@ configuration::configuration() "Default timeout for RPC requests between Redpanda nodes.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 10s) + , shadow_link_report_caching_ttl_ms( + *this, + "shadow_link_report_caching_ttl_ms", + "The duration for which shadow link reports are cached on the broker .. " + "todo: add more details", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 5000ms, + {.min = 100ms, .max = std::chrono::milliseconds(1h)}) , cloud_topics_enabled( *this, true, diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 625c06d4a8f5f..ce30aafc893e9 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -811,6 +811,9 @@ struct configuration final : public config_store { enterprise> enable_shadow_linking; property internal_rpc_request_timeout_ms; + bounded_property + shadow_link_report_caching_ttl_ms; + configuration(); error_map_t load(const YAML::Node& root_node); diff --git a/src/v/redpanda/BUILD b/src/v/redpanda/BUILD index 5fdbc2c4d3457..ffeb8e1be87ec 100644 --- a/src/v/redpanda/BUILD +++ b/src/v/redpanda/BUILD @@ -41,6 +41,7 @@ redpanda_cc_library( "//src/v/cluster/utils:partition_change_notifier_impl", "//src/v/cluster_link:fwd", "//src/v/cluster_link:rpc_service", + "//src/v/cluster_link:shadow_link_report_cache", "//src/v/compression", "//src/v/config", "//src/v/crash_tracker", diff --git a/src/v/redpanda/admin/services/shadow_link/BUILD b/src/v/redpanda/admin/services/shadow_link/BUILD index 1bd44f14a4794..902d9f4b35b11 100644 --- a/src/v/redpanda/admin/services/shadow_link/BUILD +++ b/src/v/redpanda/admin/services/shadow_link/BUILD @@ -46,6 +46,7 @@ redpanda_cc_library( "//proto/redpanda/core/admin/v2:shadow_link_redpanda_proto", "//src/v/cluster", "//src/v/cluster_link:fwd", + "//src/v/cluster_link:shadow_link_report_cache", "//src/v/cluster_link/model", "//src/v/redpanda/admin/proxy:client", "//src/v/serde/protobuf:rpc", diff --git a/src/v/redpanda/admin/services/shadow_link/err.cc b/src/v/redpanda/admin/services/shadow_link/err.cc index 00eb82877e86f..9b0c0663b4c94 100644 --- a/src/v/redpanda/admin/services/shadow_link/err.cc +++ b/src/v/redpanda/admin/services/shadow_link/err.cc @@ -31,6 +31,8 @@ void handle_error(cluster_link::errc err, ss::sstring info) { case cluster_link::errc::topic_metadata_stale: case cluster_link::errc::failed_to_stop_task: case cluster_link::errc::failed_to_pause_task: + case cluster_link::errc::report_generation_timed_out: + case cluster_link::errc::report_generation_unknown_error: throw serde::pb::rpc::internal_exception(std::move(info)); case cluster_link::errc::failed_to_connect_to_remote_cluster: case cluster_link::errc::remote_cluster_does_not_support_required_api: diff --git a/src/v/redpanda/admin/services/shadow_link/shadow_link.cc b/src/v/redpanda/admin/services/shadow_link/shadow_link.cc index 84ca468135c0c..c7e12b7ca256f 100644 --- a/src/v/redpanda/admin/services/shadow_link/shadow_link.cc +++ b/src/v/redpanda/admin/services/shadow_link/shadow_link.cc @@ -13,6 +13,7 @@ #include "cluster/metadata_cache.h" #include "cluster_link/service.h" +#include "cluster_link/shadow_link_report_cache.h" #include "redpanda/admin/services/shadow_link/converter.h" #include "redpanda/admin/services/shadow_link/err.h" #include "redpanda/admin/services/utils.h" @@ -21,12 +22,16 @@ namespace admin { ss::logger sllog("shadow_link_service"); +static constexpr auto timeout = 1min; + shadow_link_service_impl::shadow_link_service_impl( admin::proxy::client proxy_client, ss::sharded* service, + ss::sharded* shadow_link_report_cache, ss::sharded* md_cache) : _proxy_client(std::move(proxy_client)) , _service(service) + , _shadow_link_report_cache(shadow_link_report_cache) , _md_cache(md_cache) {} ss::future @@ -173,11 +178,12 @@ shadow_link_service_impl::update_shadow_link( link_name, std::move(update_cmd))); auto status_report = handle_error( - co_await _service->local().shadow_link_report(link_name)); + co_await _shadow_link_report_cache->local().get_report( + link_name, timeout, true)); proto::admin::update_shadow_link_response resp; - resp.set_shadow_link( - metadata_to_shadow_link(std::move(updated_md), std::move(status_report))); + resp.set_shadow_link(metadata_to_shadow_link( + std::move(updated_md), co_await status_report->copy())); co_return resp; } @@ -202,7 +208,8 @@ shadow_link_service_impl::fail_over( auto current_link = handle_error( _service->local().get_cluster_link(link_name)); auto status_report = handle_error( - co_await _service->local().shadow_link_report(link_name)); + co_await _shadow_link_report_cache->local().get_report( + link_name, timeout, true)); const auto& failover_topic = req.get_shadow_topic_name(); proto::admin::fail_over_response resp; if (failover_topic.empty()) { @@ -210,8 +217,8 @@ shadow_link_service_impl::fail_over( auto result = handle_error( co_await _service->local().failover_link_topics( std::move(link_name))); - resp.set_shadow_link( - metadata_to_shadow_link(std::move(result), std::move(status_report))); + resp.set_shadow_link(metadata_to_shadow_link( + std::move(result), co_await status_report->copy())); } else { // failover the specific shadow topic auto topic = model::topic{failover_topic}; @@ -220,8 +227,8 @@ shadow_link_service_impl::fail_over( std::move(link_name), std::move(topic), cluster_link::model::mirror_topic_status::failing_over)); - resp.set_shadow_link( - metadata_to_shadow_link(std::move(result), std::move(status_report))); + resp.set_shadow_link(metadata_to_shadow_link( + std::move(result), co_await status_report->copy())); } co_return resp; } @@ -320,8 +327,10 @@ shadow_link_service_impl::build_shadow_link( auto md = handle_error( _service->local().get_cluster_link(shadow_link_name)); auto link_status = handle_error( - co_await _service->local().shadow_link_report(shadow_link_name)); + co_await _shadow_link_report_cache->local().get_report( + shadow_link_name, timeout, false)); - co_return metadata_to_shadow_link(std::move(md), std::move(link_status)); + co_return metadata_to_shadow_link( + std::move(md), co_await link_status->copy()); } } // namespace admin diff --git a/src/v/redpanda/admin/services/shadow_link/shadow_link.h b/src/v/redpanda/admin/services/shadow_link/shadow_link.h index f4ba0303c7e7b..ab61ff0691cf7 100644 --- a/src/v/redpanda/admin/services/shadow_link/shadow_link.h +++ b/src/v/redpanda/admin/services/shadow_link/shadow_link.h @@ -25,6 +25,8 @@ class shadow_link_service_impl : public proto::admin::shadow_link_service { shadow_link_service_impl( admin::proxy::client proxy_client, ss::sharded* service, + ss::sharded* + shadow_link_report_cache, ss::sharded* md_cache); ss::future create_shadow_link( @@ -76,6 +78,8 @@ class shadow_link_service_impl : public proto::admin::shadow_link_service { admin::proxy::client _proxy_client; ss::sharded* _service; + ss::sharded* + _shadow_link_report_cache; ss::sharded* _md_cache; }; } // namespace admin diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 9e95d454b7f09..9a093bd7876f7 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -192,6 +192,8 @@ class application : public ssx::sharded_service_container { std::unique_ptr cloud_topics_app; ss::sharded _cluster_link_service; + ss::sharded + _shadow_link_report_cache; const std::unique_ptr& schema_registry() { return _schema_registry; diff --git a/src/v/redpanda/application_admin.cc b/src/v/redpanda/application_admin.cc index 1de5cb5588afb..fa2e8ba5b2516 100644 --- a/src/v/redpanda/application_admin.cc +++ b/src/v/redpanda/application_admin.cc @@ -81,7 +81,10 @@ void application::configure_admin_server(model::node_id node_id) { // Add RPC services s.add_service( std::make_unique( - create_client(), &_cluster_link_service, &metadata_cache)); + create_client(), + &_cluster_link_service, + &_shadow_link_report_cache, + &metadata_cache)); s.add_service( std::make_unique( create_client(), stress_fiber_manager)); diff --git a/src/v/redpanda/application_runtime.cc b/src/v/redpanda/application_runtime.cc index 3c7a379bdcc7f..961ccd1ac158a 100644 --- a/src/v/redpanda/application_runtime.cc +++ b/src/v/redpanda/application_runtime.cc @@ -12,6 +12,7 @@ #include "cluster/controller.h" #include "cluster/utils/partition_change_notifier_impl.h" #include "cluster_link/service.h" +#include "cluster_link/shadow_link_report_cache.h" #include "config/configuration.h" #include "config/node_config.h" #include "datalake/coordinator/catalog_factory.h" @@ -285,6 +286,23 @@ void application::wire_up_runtime_services( scheduling_groups::instance().cluster_linking_sg()) .get(); + construct_service( + _shadow_link_report_cache, + ss::sharded_parameter([] { + return config::shard_local_cfg() + .shadow_link_report_caching_ttl_ms.bind(); + }), + ss::sharded_parameter( + [this] -> std::unique_ptr { + if (ss::this_shard_id() == 0) { + return std::make_unique< + cluster_link::default_shadow_link_report_fetcher>( + &_cluster_link_service); + } + return std::unique_ptr{}; + })) + .get(); + syschecks::systemd_message("Creating kafka usage manager frontend").get(); construct_service( usage_manager,