Skip to content

Commit a26ec3c

Browse files
authored
Merge pull request #27724 from wdberkeley/rc-add-objects-retries
cloud_topics/reconciler: add retry mechanism for transport errors
2 parents 3ebe089 + 3d7d663 commit a26ec3c

File tree

3 files changed

+49
-3
lines changed

3 files changed

+49
-3
lines changed

src/v/cloud_topics/reconciler/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ redpanda_cc_library(
5252
"//src/v/model",
5353
"//src/v/random:generators",
5454
"//src/v/ssx:future_util",
55+
"//src/v/utils:retry_chain_node",
5556
"@abseil-cpp//absl/container:btree",
5657
"@seastar",
5758
],

src/v/cloud_topics/reconciler/reconciler.cc

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@
2424
#include "model/namespace.h"
2525
#include "random/generators.h"
2626
#include "ssx/future-util.h"
27+
#include "utils/retry_chain_node.h"
2728

2829
#include <seastar/core/shared_ptr.hh>
2930
#include <seastar/util/log.hh>
3031

32+
#include <chrono>
33+
34+
using namespace std::chrono_literals;
35+
3136
namespace {
3237
ss::logger lg("reconciler");
3338

@@ -535,6 +540,37 @@ reconciler::add_object_metadata(
535540
co_return std::expected<void, reconcile_error>{};
536541
}
537542

543+
ss::future<std::expected<l1::metastore::add_response, l1::metastore::errc>>
544+
reconciler::add_objects_with_retry(
545+
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder,
546+
l1::metastore::term_offset_map_t terms) {
547+
static constexpr auto timeout = 5s;
548+
static constexpr auto backoff = 100ms;
549+
550+
retry_chain_node rtc(_as, ss::lowres_clock::now() + timeout, backoff);
551+
retry_chain_logger ctxlog(lg, rtc, "add_objects");
552+
for (auto permit = rtc.retry(); permit.is_allowed; permit = rtc.retry()) {
553+
auto add_result = co_await _metastore->add_objects(
554+
*meta_builder, terms);
555+
556+
if (add_result.has_value()) {
557+
co_return std::move(add_result).value();
558+
}
559+
560+
if (add_result.error() != l1::metastore::errc::transport_error) {
561+
vlog(
562+
lg.error,
563+
"Non-retryable error adding objects to the L1 metastore: {}",
564+
add_result.error());
565+
co_return std::unexpected(add_result.error());
566+
}
567+
568+
co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source());
569+
}
570+
571+
co_return std::unexpected(l1::metastore::errc::transport_error);
572+
}
573+
538574
ss::future<std::expected<void, reconcile_error>> reconciler::commit_objects(
539575
const chunked_vector<built_object_metadata>& objects,
540576
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder) {
@@ -552,11 +588,11 @@ ss::future<std::expected<void, reconcile_error>> reconciler::commit_objects(
552588
}
553589
}
554590

555-
auto add_objects_result = co_await _metastore->add_objects(
556-
*meta_builder, terms);
591+
auto add_objects_result = co_await add_objects_with_retry(
592+
std::move(meta_builder), std::move(terms));
557593
if (!add_objects_result.has_value()) {
558594
vlog(
559-
lg.error,
595+
lg.warn,
560596
"Failed to add objects to the L1 metastore: {}",
561597
add_objects_result.error());
562598
// TODO: The objects have been uploaded. The reconciler could

src/v/cloud_topics/reconciler/reconciler.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,15 @@ class reconciler {
254254
make_reader(frontend*, kafka::offset start_offset, size_t);
255255

256256
private:
257+
/*
258+
* Retry metastore add_objects calls on transport errors.
259+
* Other metastore errors are not retried.
260+
*/
261+
ss::future<std::expected<l1::metastore::add_response, l1::metastore::errc>>
262+
add_objects_with_retry(
263+
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder,
264+
l1::metastore::term_offset_map_t terms);
265+
257266
data_plane_api* _data_plane;
258267
l1::io* _l1_io;
259268
l1::metastore* _metastore;

0 commit comments

Comments
 (0)