From 8b335484c5b2e84935933d4e88cc1db16367f16a Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Wed, 18 Dec 2024 14:07:42 -0500 Subject: [PATCH] archival: Use RW-fence in housekeeping This commit enables rw-fence mechanism for housekeeping reuploads. In case if the housekeeping is running on a stale version of the STM snapshot the update will be rejected. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../archival/adjacent_segment_merger.cc | 29 ++++--- .../cluster/archival/ntp_archiver_service.cc | 86 ++++++++++++------- src/v/cluster/archival/ntp_archiver_service.h | 14 +-- 3 files changed, 81 insertions(+), 48 deletions(-) diff --git a/src/v/cluster/archival/adjacent_segment_merger.cc b/src/v/cluster/archival/adjacent_segment_merger.cc index 1809fb913029..4694ec975423 100644 --- a/src/v/cluster/archival/adjacent_segment_merger.cc +++ b/src/v/cluster/archival/adjacent_segment_merger.cc @@ -213,29 +213,32 @@ adjacent_segment_merger::run(run_quota_t quota) { const cloud_storage::partition_manifest& manifest) { return scan_manifest(local_start_offset, manifest); }; - auto [archiver_units, upl] = co_await _archiver.find_reupload_candidate( - scanner); - if (!upl.has_value()) { + auto find_res = co_await _archiver.find_reupload_candidate(scanner); + if (!find_res.locks.has_value()) { vlog(_ctxlog.debug, "No more upload candidates"); co_return result; } - vassert(archiver_units.has_value(), "Must take archiver units"); - auto next = model::next_offset(upl->candidate.final_offset); + vassert(find_res.units.has_value(), "Must take archiver units"); + auto next = model::next_offset(find_res.locks->candidate.final_offset); vlog( _ctxlog.debug, - "Going to upload segment {}, num source segments {}, last offset {}", - upl->candidate.exposed_name, - upl->candidate.sources.size(), - upl->candidate.final_offset); - for (const auto& src : upl->candidate.sources) { + "Going to upload segment {}, num source segments {}, last offset {}, " + "read-write-fence value: {}", + find_res.locks->candidate.exposed_name, + find_res.locks->candidate.sources.size(), + find_res.locks->candidate.final_offset, + find_res.read_write_fence.read_write_fence); + for (const auto& src : find_res.locks->candidate.sources) { vlog( _ctxlog.debug, "Local log segment {} found, size {}", src->filename(), src->size_bytes()); } + auto uploaded = co_await _archiver.upload( - std::move(*archiver_units), std::move(*upl), std::ref(_root_rtc)); + std::move(find_res), std::ref(_root_rtc)); + if (uploaded) { _last = next; result.status = run_status::ok; @@ -246,14 +249,14 @@ adjacent_segment_merger::run(run_quota_t quota) { result.remaining = result.remaining - run_quota_t{1}; vlog( _ctxlog.debug, - "Successfuly uploaded segment, new last offfset is {}", + "Successfully uploaded segment, new last offset is {}", _last); } else { // Upload failed result.status = run_status::failed; vlog( _ctxlog.debug, - "Failed to upload segment, last offfset is {}", + "Failed to upload segment, last offset is {}", _last); } } diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index c9ed834322ea..337e5d526a31 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -3301,18 +3301,23 @@ ntp_archiver::get_housekeeping_jobs() { return res; } -ss::future, - std::optional>> +ss::future ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { ss::gate::holder holder(_gate); + archival_stm_fence rw_fence{ + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; if (!may_begin_uploads()) { - co_return std::make_pair(std::nullopt, std::nullopt); + co_return find_reupload_candidate_result{ + std::nullopt, std::nullopt, {}}; } auto run = scanner(_parent.raft_start_offset(), manifest()); if (!run.has_value()) { vlog(_rtclog.debug, "Scan didn't resulted in upload candidate"); - co_return std::make_pair(std::nullopt, std::nullopt); + co_return find_reupload_candidate_result{ + std::nullopt, std::nullopt, {}}; } else { vlog(_rtclog.debug, "Scan result: {}", run); } @@ -3331,18 +3336,16 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { auto candidate = co_await collector.make_upload_candidate( _conf->upload_io_priority, _conf->segment_upload_timeout()); - using ret_t = std::pair< - std::optional, - std::optional>; co_return ss::visit( candidate, - [](std::monostate) -> ret_t { + [](std::monostate) -> find_reupload_candidate_result { vassert( false, "unexpected default re-upload candidate creation result"); }, - [this, &run, units = std::move(units)]( - upload_candidate_with_locks& upload_candidate) mutable -> ret_t { + [this, &run, &rw_fence, units = std::move(units)]( + upload_candidate_with_locks& upload_candidate) mutable + -> find_reupload_candidate_result { if ( upload_candidate.candidate.content_length != run->meta.size_bytes @@ -3357,27 +3360,28 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { "{}, run: {}", upload_candidate.candidate, run->meta); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; } - return std::make_pair( - std::move(units), std::move(upload_candidate)); + return {std::move(units), std::move(upload_candidate), rw_fence}; }, - [this](skip_offset_range& skip_offsets) -> ret_t { + [this]( + skip_offset_range& skip_offsets) -> find_reupload_candidate_result { vlog( _rtclog.warn, "Failed to make reupload candidate: {}", skip_offsets.reason); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; }, - [this](candidate_creation_error& error) -> ret_t { + [this]( + candidate_creation_error& error) -> find_reupload_candidate_result { const auto log_level = log_level_for_error(error); vlogl( _rtclog, log_level, "Failed to make reupload candidate: {}", error); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; }); } // segment_name exposed_name; @@ -3395,18 +3399,26 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { = cloud_storage::partition_manifest::generate_remote_segment_name( run->meta); // Create a remote upload candidate - co_return std::make_pair( - std::move(units), upload_candidate_with_locks{std::move(candidate)}); + co_return find_reupload_candidate_result{ + std::move(units), + upload_candidate_with_locks{std::move(candidate)}, + rw_fence}; } ss::future ntp_archiver::upload( - ssx::semaphore_units archiver_units, - upload_candidate_with_locks upload_locks, + find_reupload_candidate_result find_res, std::optional> source_rtc) { ss::gate::holder holder(_gate); - auto units = std::move(archiver_units); - if (upload_locks.candidate.sources.size() > 0) { - co_return co_await do_upload_local(std::move(upload_locks), source_rtc); + if (!find_res.locks.has_value() || !find_res.units.has_value()) { + // The method shouldn't be called if this is the case + co_return false; + } + auto units = std::move(find_res.units); + if (find_res.locks->candidate.sources.size() > 0) { + co_return co_await do_upload_local( + find_res.read_write_fence, + std::move(find_res.locks.value()), + source_rtc); } // Currently, the uploading of remote segments is disabled and // the only reason why the list of locks is empty is truncation. @@ -3417,6 +3429,7 @@ ss::future ntp_archiver::upload( } ss::future ntp_archiver::do_upload_local( + archival_stm_fence fence, upload_candidate_with_locks upload_locks, std::optional> source_rtc) { if (!may_begin_uploads()) { @@ -3531,14 +3544,27 @@ ss::future ntp_archiver::do_upload_local( ? _parent.highest_producer_id() : model::producer_id{}; auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout(); - auto error = co_await _parent.archival_meta_stm()->add_segments( + + auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + vlog( + archival_log.debug, + "(2) fence value is: {}, unsafe add: {}, manifest last applied " + "offset: {}, manifest in-sync offset: {}", + fence.read_write_fence, + fence.unsafe_add, + _parent.archival_meta_stm()->manifest().get_applied_offset(), + _parent.archival_meta_stm()->get_insync_offset()); + builder.read_write_fence(fence.read_write_fence); + } + builder.add_segments( {meta}, - std::nullopt, - highest_producer_id, - deadline, - _as, checks_disabled ? cluster::segment_validated::no : cluster::segment_validated::yes); + builder.update_highest_producer_id(highest_producer_id); + + auto error = co_await builder.replicate(); + if (error != cluster::errc::success && error != cluster::errc::not_leader) { vlog( _rtclog.warn, diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index 36b29ac58db5..cc1155152d9c 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -336,6 +336,12 @@ class ntp_archiver { model::offset local_start_offset, const cloud_storage::partition_manifest& manifest)>; + struct find_reupload_candidate_result { + std::optional units; + std::optional locks; + archival_stm_fence read_write_fence; + }; + /// Find upload candidate /// /// Depending on the output of the 'scanner' the upload candidate @@ -345,9 +351,7 @@ class ntp_archiver { /// /// \param scanner is a user provided function used to find upload candidate /// \return {nullopt, nullopt} or the archiver lock and upload candidate - ss::future, - std::optional>> + ss::future find_reupload_candidate(manifest_scanner_t scanner); /** @@ -366,8 +370,7 @@ class ntp_archiver { * \return true on success and false otherwise */ ss::future upload( - ssx::semaphore_units archiver_units, - upload_candidate_with_locks candidate, + find_reupload_candidate_result find_res, std::optional> source_rtc); /// Return reference to partition manifest from archival STM @@ -445,6 +448,7 @@ class ntp_archiver { ss::future<> maybe_complete_flush(); ss::future do_upload_local( + archival_stm_fence fence, upload_candidate_with_locks candidate, std::optional> source_rtc); ss::future do_upload_remote(