Skip to content

Commit

Permalink
archival: Use RW-fence in housekeeping
Browse files Browse the repository at this point in the history
This commit enables rw-fence mechanism for housekeeping reuploads. In
case if the housekeeping is running on a stale version of the STM
snapshot the update will be rejected.

Signed-off-by: Evgeny Lazin <[email protected]>
  • Loading branch information
Lazin committed Dec 19, 2024
1 parent 774e7e4 commit 9cb55f3
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 48 deletions.
29 changes: 16 additions & 13 deletions src/v/cluster/archival/adjacent_segment_merger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,29 +213,32 @@ adjacent_segment_merger::run(run_quota_t quota) {
const cloud_storage::partition_manifest& manifest) {
return scan_manifest(local_start_offset, manifest);
};
auto [archiver_units, upl] = co_await _archiver.find_reupload_candidate(
scanner);
if (!upl.has_value()) {
auto find_res = co_await _archiver.find_reupload_candidate(scanner);
if (!find_res.locks.has_value()) {
vlog(_ctxlog.debug, "No more upload candidates");
co_return result;
}
vassert(archiver_units.has_value(), "Must take archiver units");
auto next = model::next_offset(upl->candidate.final_offset);
vassert(find_res.units.has_value(), "Must take archiver units");
auto next = model::next_offset(find_res.locks->candidate.final_offset);
vlog(
_ctxlog.debug,
"Going to upload segment {}, num source segments {}, last offset {}",
upl->candidate.exposed_name,
upl->candidate.sources.size(),
upl->candidate.final_offset);
for (const auto& src : upl->candidate.sources) {
"Going to upload segment {}, num source segments {}, last offset {}, "
"read-write-fence value: {}",
find_res.locks->candidate.exposed_name,
find_res.locks->candidate.sources.size(),
find_res.locks->candidate.final_offset,
find_res.read_write_fence.read_write_fence);
for (const auto& src : find_res.locks->candidate.sources) {
vlog(
_ctxlog.debug,
"Local log segment {} found, size {}",
src->filename(),
src->size_bytes());
}

auto uploaded = co_await _archiver.upload(
std::move(*archiver_units), std::move(*upl), std::ref(_root_rtc));
std::move(find_res), std::ref(_root_rtc));

if (uploaded) {
_last = next;
result.status = run_status::ok;
Expand All @@ -246,14 +249,14 @@ adjacent_segment_merger::run(run_quota_t quota) {
result.remaining = result.remaining - run_quota_t{1};
vlog(
_ctxlog.debug,
"Successfuly uploaded segment, new last offfset is {}",
"Successfully uploaded segment, new last offset is {}",
_last);
} else {
// Upload failed
result.status = run_status::failed;
vlog(
_ctxlog.debug,
"Failed to upload segment, last offfset is {}",
"Failed to upload segment, last offset is {}",
_last);
}
}
Expand Down
86 changes: 56 additions & 30 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3301,18 +3301,23 @@ ntp_archiver::get_housekeeping_jobs() {
return res;
}

ss::future<std::pair<
std::optional<ssx::semaphore_units>,
std::optional<upload_candidate_with_locks>>>
ss::future<ntp_archiver::find_reupload_candidate_result>
ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
ss::gate::holder holder(_gate);
archival_stm_fence rw_fence{
.read_write_fence
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
.unsafe_add = false,
};
if (!may_begin_uploads()) {
co_return std::make_pair(std::nullopt, std::nullopt);
co_return find_reupload_candidate_result{
std::nullopt, std::nullopt, {}};
}
auto run = scanner(_parent.raft_start_offset(), manifest());
if (!run.has_value()) {
vlog(_rtclog.debug, "Scan didn't resulted in upload candidate");
co_return std::make_pair(std::nullopt, std::nullopt);
co_return find_reupload_candidate_result{
std::nullopt, std::nullopt, {}};
} else {
vlog(_rtclog.debug, "Scan result: {}", run);
}
Expand All @@ -3331,18 +3336,16 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
auto candidate = co_await collector.make_upload_candidate(
_conf->upload_io_priority, _conf->segment_upload_timeout());

using ret_t = std::pair<
std::optional<ssx::semaphore_units>,
std::optional<upload_candidate_with_locks>>;
co_return ss::visit(
candidate,
[](std::monostate) -> ret_t {
[](std::monostate) -> find_reupload_candidate_result {
vassert(
false,
"unexpected default re-upload candidate creation result");
},
[this, &run, units = std::move(units)](
upload_candidate_with_locks& upload_candidate) mutable -> ret_t {
[this, &run, &rw_fence, units = std::move(units)](
upload_candidate_with_locks& upload_candidate) mutable
-> find_reupload_candidate_result {
if (
upload_candidate.candidate.content_length
!= run->meta.size_bytes
Expand All @@ -3357,27 +3360,28 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
"{}, run: {}",
upload_candidate.candidate,
run->meta);
return std::make_pair(std::nullopt, std::nullopt);
return {std::nullopt, std::nullopt, {}};
}

return std::make_pair(
std::move(units), std::move(upload_candidate));
return {std::move(units), std::move(upload_candidate), rw_fence};
},
[this](skip_offset_range& skip_offsets) -> ret_t {
[this](
skip_offset_range& skip_offsets) -> find_reupload_candidate_result {
vlog(
_rtclog.warn,
"Failed to make reupload candidate: {}",
skip_offsets.reason);
return std::make_pair(std::nullopt, std::nullopt);
return {std::nullopt, std::nullopt, {}};
},
[this](candidate_creation_error& error) -> ret_t {
[this](
candidate_creation_error& error) -> find_reupload_candidate_result {
const auto log_level = log_level_for_error(error);
vlogl(
_rtclog,
log_level,
"Failed to make reupload candidate: {}",
error);
return std::make_pair(std::nullopt, std::nullopt);
return {std::nullopt, std::nullopt, {}};
});
}
// segment_name exposed_name;
Expand All @@ -3395,18 +3399,26 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) {
= cloud_storage::partition_manifest::generate_remote_segment_name(
run->meta);
// Create a remote upload candidate
co_return std::make_pair(
std::move(units), upload_candidate_with_locks{std::move(candidate)});
co_return find_reupload_candidate_result{
std::move(units),
upload_candidate_with_locks{std::move(candidate)},
rw_fence};
}

ss::future<bool> ntp_archiver::upload(
ssx::semaphore_units archiver_units,
upload_candidate_with_locks upload_locks,
find_reupload_candidate_result find_res,
std::optional<std::reference_wrapper<retry_chain_node>> source_rtc) {
ss::gate::holder holder(_gate);
auto units = std::move(archiver_units);
if (upload_locks.candidate.sources.size() > 0) {
co_return co_await do_upload_local(std::move(upload_locks), source_rtc);
if (!find_res.locks.has_value() || !find_res.units.has_value()) {
// The method shouldn't be called if this is the case
co_return false;
}
auto units = std::move(find_res.units);
if (find_res.locks->candidate.sources.size() > 0) {
co_return co_await do_upload_local(
find_res.read_write_fence,
std::move(find_res.locks.value()),
source_rtc);
}
// Currently, the uploading of remote segments is disabled and
// the only reason why the list of locks is empty is truncation.
Expand All @@ -3417,6 +3429,7 @@ ss::future<bool> ntp_archiver::upload(
}

ss::future<bool> ntp_archiver::do_upload_local(
archival_stm_fence fence,
upload_candidate_with_locks upload_locks,
std::optional<std::reference_wrapper<retry_chain_node>> source_rtc) {
if (!may_begin_uploads()) {
Expand Down Expand Up @@ -3531,14 +3544,27 @@ ss::future<bool> ntp_archiver::do_upload_local(
? _parent.highest_producer_id()
: model::producer_id{};
auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout();
auto error = co_await _parent.archival_meta_stm()->add_segments(

auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as);
if (!fence.unsafe_add) {
vlog(
archival_log.debug,
"(2) fence value is: {}, unsafe add: {}, manifest last applied "
"offset: {}, manifest in-sync offset: {}",
fence.read_write_fence,
fence.unsafe_add,
_parent.archival_meta_stm()->manifest().get_applied_offset(),
_parent.archival_meta_stm()->get_insync_offset());
builder.read_write_fence(fence.read_write_fence);
}
builder.add_segments(
{meta},
std::nullopt,
highest_producer_id,
deadline,
_as,
checks_disabled ? cluster::segment_validated::no
: cluster::segment_validated::yes);
builder.update_highest_producer_id(highest_producer_id);

auto error = co_await builder.replicate();

if (error != cluster::errc::success && error != cluster::errc::not_leader) {
vlog(
_rtclog.warn,
Expand Down
14 changes: 9 additions & 5 deletions src/v/cluster/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ class ntp_archiver {
model::offset local_start_offset,
const cloud_storage::partition_manifest& manifest)>;

struct find_reupload_candidate_result {
std::optional<ssx::semaphore_units> units;
std::optional<upload_candidate_with_locks> locks;
archival_stm_fence read_write_fence;
};

/// Find upload candidate
///
/// Depending on the output of the 'scanner' the upload candidate
Expand All @@ -345,9 +351,7 @@ class ntp_archiver {
///
/// \param scanner is a user provided function used to find upload candidate
/// \return {nullopt, nullopt} or the archiver lock and upload candidate
ss::future<std::pair<
std::optional<ssx::semaphore_units>,
std::optional<upload_candidate_with_locks>>>
ss::future<find_reupload_candidate_result>
find_reupload_candidate(manifest_scanner_t scanner);

/**
Expand All @@ -366,8 +370,7 @@ class ntp_archiver {
* \return true on success and false otherwise
*/
ss::future<bool> upload(
ssx::semaphore_units archiver_units,
upload_candidate_with_locks candidate,
find_reupload_candidate_result find_res,
std::optional<std::reference_wrapper<retry_chain_node>> source_rtc);

/// Return reference to partition manifest from archival STM
Expand Down Expand Up @@ -445,6 +448,7 @@ class ntp_archiver {
ss::future<> maybe_complete_flush();

ss::future<bool> do_upload_local(
archival_stm_fence fence,
upload_candidate_with_locks candidate,
std::optional<std::reference_wrapper<retry_chain_node>> source_rtc);
ss::future<bool> do_upload_remote(
Expand Down

0 comments on commit 9cb55f3

Please sign in to comment.