From 3d7d663333a2834d5b987cf83f376cd104809d89 Mon Sep 17 00:00:00 2001 From: Will Berkeley Date: Tue, 23 Sep 2025 15:20:38 -0700 Subject: [PATCH] cloud_topics/reconciler: add retry mechanism for metastore transport errors Add retry logic to reconciler's metastore add_objects calls to handle transport errors. Previously, the reconciler would immediately fail and abandon the reconciliation round on any metastore error. --- src/v/cloud_topics/reconciler/BUILD | 1 + src/v/cloud_topics/reconciler/reconciler.cc | 42 +++++++++++++++++++-- src/v/cloud_topics/reconciler/reconciler.h | 9 +++++ 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/v/cloud_topics/reconciler/BUILD b/src/v/cloud_topics/reconciler/BUILD index 392b31351f2cb..06defa62909cd 100644 --- a/src/v/cloud_topics/reconciler/BUILD +++ b/src/v/cloud_topics/reconciler/BUILD @@ -52,6 +52,7 @@ redpanda_cc_library( "//src/v/model", "//src/v/random:generators", "//src/v/ssx:future_util", + "//src/v/utils:retry_chain_node", "@abseil-cpp//absl/container:btree", "@seastar", ], diff --git a/src/v/cloud_topics/reconciler/reconciler.cc b/src/v/cloud_topics/reconciler/reconciler.cc index e338813e23419..b5fb7d2ae744f 100644 --- a/src/v/cloud_topics/reconciler/reconciler.cc +++ b/src/v/cloud_topics/reconciler/reconciler.cc @@ -24,10 +24,15 @@ #include "model/namespace.h" #include "random/generators.h" #include "ssx/future-util.h" +#include "utils/retry_chain_node.h" #include #include +#include + +using namespace std::chrono_literals; + namespace { ss::logger lg("reconciler"); @@ -534,6 +539,37 @@ reconciler::add_object_metadata( co_return std::expected{}; } +ss::future> +reconciler::add_objects_with_retry( + std::unique_ptr meta_builder, + l1::metastore::term_offset_map_t terms) { + static constexpr auto timeout = 5s; + static constexpr auto backoff = 100ms; + + retry_chain_node rtc(_as, ss::lowres_clock::now() + timeout, backoff); + retry_chain_logger ctxlog(lg, rtc, "add_objects"); + for (auto permit = rtc.retry(); permit.is_allowed; permit = rtc.retry()) { + auto add_result = co_await _metastore->add_objects( + *meta_builder, terms); + + if (add_result.has_value()) { + co_return std::move(add_result).value(); + } + + if (add_result.error() != l1::metastore::errc::transport_error) { + vlog( + lg.error, + "Non-retryable error adding objects to the L1 metastore: {}", + add_result.error()); + co_return std::unexpected(add_result.error()); + } + + co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source()); + } + + co_return std::unexpected(l1::metastore::errc::transport_error); +} + ss::future> reconciler::commit_objects( const chunked_vector& objects, std::unique_ptr meta_builder) { @@ -551,11 +587,11 @@ ss::future> reconciler::commit_objects( } } - auto add_objects_result = co_await _metastore->add_objects( - *meta_builder, terms); + auto add_objects_result = co_await add_objects_with_retry( + std::move(meta_builder), std::move(terms)); if (!add_objects_result.has_value()) { vlog( - lg.error, + lg.warn, "Failed to add objects to the L1 metastore: {}", add_objects_result.error()); // TODO: The objects have been uploaded. The reconciler could diff --git a/src/v/cloud_topics/reconciler/reconciler.h b/src/v/cloud_topics/reconciler/reconciler.h index 37b2f5fccf2c5..303f3b64a005d 100644 --- a/src/v/cloud_topics/reconciler/reconciler.h +++ b/src/v/cloud_topics/reconciler/reconciler.h @@ -262,6 +262,15 @@ class reconciler { ntp_to_topic_id_partition(const model::ntp& ntp) const; private: + /* + * Retry metastore add_objects calls on transport errors. + * Other metastore errors are not retried. + */ + ss::future> + add_objects_with_retry( + std::unique_ptr meta_builder, + l1::metastore::term_offset_map_t terms); + data_plane_api* _data_plane; l1::io* _l1_io; l1::metastore* _metastore;