Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_topics/reconciler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ redpanda_cc_library(
"//src/v/model",
"//src/v/random:generators",
"//src/v/ssx:future_util",
"//src/v/utils:retry_chain_node",
"@abseil-cpp//absl/container:btree",
"@seastar",
],
Expand Down
42 changes: 39 additions & 3 deletions src/v/cloud_topics/reconciler/reconciler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
#include "model/namespace.h"
#include "random/generators.h"
#include "ssx/future-util.h"
#include "utils/retry_chain_node.h"

#include <seastar/core/shared_ptr.hh>
#include <seastar/util/log.hh>

#include <chrono>

using namespace std::chrono_literals;

namespace {
ss::logger lg("reconciler");

Expand Down Expand Up @@ -534,6 +539,37 @@ reconciler::add_object_metadata(
co_return std::expected<void, reconcile_error>{};
}

ss::future<std::expected<l1::metastore::add_response, l1::metastore::errc>>
reconciler::add_objects_with_retry(
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder,
l1::metastore::term_offset_map_t terms) {
static constexpr auto timeout = 5s;
static constexpr auto backoff = 100ms;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have some existing config parameters that can be used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poked around a bit and couldn't find any config properties or general parameters in cloud topics for this. Did you have something in mind? Maybe there are some numbers that could be applied multiple places we could factor out into configuration?


retry_chain_node rtc(_as, ss::lowres_clock::now() + timeout, backoff);
retry_chain_logger ctxlog(lg, rtc, "add_objects");
for (auto permit = rtc.retry(); permit.is_allowed; permit = rtc.retry()) {
auto add_result = co_await _metastore->add_objects(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the code inside the metastore->add_objects retry? It should have better "understanding" on how the requests should be retried.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requests are idempotent, and Andrew considered this to be the simplest thing: https://redpandadata.atlassian.net/browse/CORE-13427?focusedCommentId=89779

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not ideal because this way we're baking in some assumptions about the metastore and it's commands into the implementation of the reconciler (specifically, the fact that the command is idempotent and that the backoff is exponential) but maybe it's not a major concern at this moment

*meta_builder, terms);

if (add_result.has_value()) {
co_return std::move(add_result).value();
}

if (add_result.error() != l1::metastore::errc::transport_error) {
vlog(
lg.error,
"Non-retryable error adding objects to the L1 metastore: {}",
add_result.error());
co_return std::unexpected(add_result.error());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we got metastore transport_error doesn't mean that the add_objects request wasn't applied to the metastore. Is add_objects idempotent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is


co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw if aborted, everywhere else we use std::unexpected. Should we do that too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's inside the reconciler round's try-catch so if it aborts (presumably because of shutdown) it'll abandon the round, which I think is the right thing to do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's fine, but it's get a little hard to review when we are just mixing std::expected and exceptions. a reasonable pattern might be to have a top-level try/catch which treats exceptions as surprises.

}

co_return std::unexpected(l1::metastore::errc::transport_error);
}

ss::future<std::expected<void, reconcile_error>> reconciler::commit_objects(
const chunked_vector<built_object_metadata>& objects,
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder) {
Expand All @@ -551,11 +587,11 @@ ss::future<std::expected<void, reconcile_error>> reconciler::commit_objects(
}
}

auto add_objects_result = co_await _metastore->add_objects(
*meta_builder, terms);
auto add_objects_result = co_await add_objects_with_retry(
std::move(meta_builder), std::move(terms));
if (!add_objects_result.has_value()) {
vlog(
lg.error,
lg.warn,
"Failed to add objects to the L1 metastore: {}",
add_objects_result.error());
// TODO: The objects have been uploaded. The reconciler could
Expand Down
9 changes: 9 additions & 0 deletions src/v/cloud_topics/reconciler/reconciler.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ class reconciler {
ntp_to_topic_id_partition(const model::ntp& ntp) const;

private:
/*
* Retry metastore add_objects calls on transport errors.
* Other metastore errors are not retried.
*/
ss::future<std::expected<l1::metastore::add_response, l1::metastore::errc>>
add_objects_with_retry(
std::unique_ptr<l1::metastore::object_metadata_builder> meta_builder,
l1::metastore::term_offset_map_t terms);

data_plane_api* _data_plane;
l1::io* _l1_io;
l1::metastore* _metastore;
Expand Down