Skip to content

Commit

Permalink
archival: Use read-write fence in the ntp_archiver
Browse files Browse the repository at this point in the history
The fence is a value which is produced by recording the current offset
of the last applied archival STM command. The fence is added to the
archival STM config batch. If no commands were applied to the STM the
offset of the last applied command will be the same and the
configuration batch will be applied. Otherwise, if some command was
applied after the record batch was constructed the batch will not be
applied.

This is essentially a concurrency control mechamism. The commands are
already present in the STM but not used as for now. The existing
metadata validation mechanism is disabled. We're only checking that the
actual segment data matches expectation.
  • Loading branch information
Lazin committed Dec 14, 2024
1 parent 60ad2f4 commit 2e39592
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 36 deletions.
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/produce_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class remote_segment_generator {
co_return -1;
}
if (
(co_await archiver.upload_next_candidates())
(co_await archiver.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true}))
.non_compacted_upload_result.num_failed
> 0) {
co_return -1;
Expand Down
59 changes: 28 additions & 31 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,13 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
continue;
}

// This is the offset of the last applied command. It is used
// as a fence to implement optimistic concurrency control.
auto fence = _parent.archival_meta_stm()->last_applied_offset();

auto [non_compacted_upload_result, compacted_upload_result]
= co_await upload_next_candidates();
= co_await upload_next_candidates(
archival_stm_fence{.read_write_fence = fence});
if (non_compacted_upload_result.num_failed != 0) {
// The logic in class `remote` already does retries: if we get here,
// it means the upload failed after several retries, justifying
Expand Down Expand Up @@ -2087,6 +2092,7 @@ ntp_archiver::schedule_uploads(std::vector<upload_context> loop_contexts) {
}

ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
archival_stm_fence fence,
std::vector<scheduled_upload> scheduled,
segment_upload_kind segment_kind,
bool inline_manifest) {
Expand Down Expand Up @@ -2164,30 +2170,6 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
const bool checks_disabled
= config::shard_local_cfg()
.cloud_storage_disable_upload_consistency_checks.value();
if (!checks_disabled) {
std::vector<cloud_storage::segment_meta> meta;
for (size_t i = 0; i < segment_results.size(); i++) {
meta.push_back(scheduled[ixupload[i]].meta.value());
}
size_t num_accepted = manifest().safe_segment_meta_to_add(
std::move(meta));
if (num_accepted < segment_results.size()) {
vlog(
_rtclog.warn,
"Metadata inconsistency detected, {} segments uploaded but only "
"{} can be added",
segment_results.size(),
num_accepted);
_probe->gap_detected(model::offset(
static_cast<int64_t>(segment_results.size() - num_accepted)));
}
vassert(
num_accepted <= segment_results.size(),
"Accepted {} segments but only {} segments are uploaded",
num_accepted,
segment_results.size());
segment_results.resize(num_accepted);
}
for (size_t i = 0; i < segment_results.size(); i++) {
if (
segment_results[i].result()
Expand Down Expand Up @@ -2256,14 +2238,25 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
? _parent.highest_producer_id()
: model::producer_id{};

auto error = co_await _parent.archival_meta_stm()->add_segments(
auto batch_builder = _parent.archival_meta_stm()->batch_start(
deadline, _as);
if (!fence.unsafe_add) {
// The fence should be added first because it can only
// affect commands which are following it in the same record
// batch.
batch_builder.read_write_fence(fence.read_write_fence);
}
batch_builder.add_segments(
mdiff,
manifest_clean_offset,
highest_producer_id,
deadline,
_as,
checks_disabled ? cluster::segment_validated::no
: cluster::segment_validated::yes);
if (manifest_clean_offset.has_value()) {
batch_builder.mark_clean(manifest_clean_offset.value());
}
batch_builder.update_highest_producer_id(highest_producer_id);

auto error = co_await batch_builder.replicate();

if (
error != cluster::errc::success
&& error != cluster::errc::not_leader) {
Expand Down Expand Up @@ -2303,6 +2296,7 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
}

ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
archival_stm_fence fence,
std::vector<ntp_archiver::scheduled_upload> scheduled) {
// Split the set of scheduled uploads into compacted and non compacted
// uploads, and then wait for them separately. They can also be waited on
Expand Down Expand Up @@ -2341,10 +2335,12 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
auto [non_compacted_result, compacted_result]
= co_await ss::when_all_succeed(
wait_uploads(
fence,
std::move(non_compacted_uploads),
segment_upload_kind::non_compacted,
inline_manifest_in_non_compacted_uploads),
wait_uploads(
fence,
std::move(compacted_uploads),
segment_upload_kind::compacted,
!inline_manifest_in_non_compacted_uploads));
Expand Down Expand Up @@ -2374,6 +2370,7 @@ model::offset ntp_archiver::max_uploadable_offset_exclusive() const {
}

ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
archival_stm_fence fence,
std::optional<model::offset> unsafe_max_offset_override_exclusive) {
auto max_offset_exclusive = unsafe_max_offset_override_exclusive
? *unsafe_max_offset_override_exclusive
Expand All @@ -2389,7 +2386,7 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
auto scheduled_uploads = co_await schedule_uploads(
max_offset_exclusive);
co_return co_await wait_all_scheduled_uploads(
std::move(scheduled_uploads));
fence, std::move(scheduled_uploads));
} catch (const ss::gate_closed_exception&) {
} catch (const ss::abort_requested_exception&) {
}
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ enum class wait_result { not_in_progress, complete, lost_leadership, failed };

std::ostream& operator<<(std::ostream& os, wait_result wr);

/// Fence value for the archival STM.
/// The value is used to implement optimistic
/// concurrency control.
struct archival_stm_fence {
// Offset of the last command added to the
// archival STM
model::offset read_write_fence;
// Disable fencing in tests
bool unsafe_add{false};
};

/// This class performs per-ntp archival workload. Every ntp can be
/// processed independently, without the knowledge about others. All
/// 'ntp_archiver' instances that the shard possesses are supposed to be
Expand Down Expand Up @@ -225,6 +236,7 @@ class ntp_archiver {
/// offset.
/// \return future that returns number of uploaded/failed segments
virtual ss::future<batch_result> upload_next_candidates(
archival_stm_fence fence,
std::optional<model::offset> unsafe_max_offset_override_exclusive
= std::nullopt);

Expand Down Expand Up @@ -498,13 +510,15 @@ class ntp_archiver {
///
/// Update the probe and manifest
ss::future<ntp_archiver::batch_result> wait_all_scheduled_uploads(
archival_stm_fence fence,
std::vector<ntp_archiver::scheduled_upload> scheduled);

/// Waits for scheduled segment uploads. The uploaded segments could be
/// compacted or non-compacted, the actions taken are similar in both
/// cases with the major difference being the probe updates done after
/// the upload.
ss::future<ntp_archiver::upload_group_result> wait_uploads(
archival_stm_fence fence,
std::vector<scheduled_upload> scheduled,
segment_upload_kind segment_kind,
bool inline_manifest);
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ FIXTURE_TEST(
});

retry_chain_node fib(never_abort);
auto res = archiver.upload_next_candidates().get();
auto res = archiver
.upload_next_candidates(archival_stm_fence{.unsafe_add = true})
.get();

auto&& [non_compacted_result, compacted_result] = res;
BOOST_REQUIRE_EQUAL(non_compacted_result.num_succeeded, 0);
Expand Down
9 changes: 6 additions & 3 deletions src/v/cluster/archival/tests/service_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/archival/tests/service_fixture.h"

#include "archival/ntp_archiver_service.h"
#include "base/seastarx.h"
#include "bytes/iobuf.h"
#include "bytes/iobuf_parser.h"
Expand Down Expand Up @@ -557,7 +558,8 @@ archiver_fixture::do_upload_next(
if (model::timeout_clock::now() > deadline) {
co_return archival::ntp_archiver::batch_result{};
}
auto result = co_await archiver.upload_next_candidates(lso);
auto result = co_await archiver.upload_next_candidates(
archival_stm_fence{.unsafe_add = true}, lso);
auto num_success = result.compacted_upload_result.num_succeeded
+ result.non_compacted_upload_result.num_succeeded;
if (num_success > 0) {
Expand All @@ -581,8 +583,9 @@ void archiver_fixture::upload_and_verify(
tests::cooperative_spin_wait_with_timeout(
10s,
[&archiver, expected, lso]() {
return archiver.upload_next_candidates(lso).then(
[expected](auto result) { return result == expected; });
return archiver
.upload_next_candidates(archival_stm_fence{.unsafe_add = true}, lso)
.then([expected](auto result) { return result == expected; });
})
.get();
}
Expand Down

0 comments on commit 2e39592

Please sign in to comment.