Skip to content

Commit 35dc6d6

Browse files
committed
archival: Fence spillover command
Also, extract fence initialization into a method in the ntp_archiver to avoid code duplication. There is a change in the control flow in the 'apply_spillover' method. Previously, the spillover wouldn't stop in case of replication error causing the error to be repeated. The loop would use manifest to create a spillover manifest and replicate the command with archival STM. The replicate method waits until the command is applied and propagates the error back to the loop. In case of error the error was printed and the loop continued. Since the state of the manifest didn't change the loop would produce the same manifesta and the same command causing new failure. This commit breaks if the spillover command can't be applied. This guarantees forward progress. Signed-off-by: Evgeny Lazin <[email protected]>
1 parent 2a7cb4f commit 35dc6d6

File tree

2 files changed

+37
-43
lines changed

2 files changed

+37
-43
lines changed

src/v/cluster/archival/ntp_archiver_service.cc

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,16 @@ ntp_archiver::ntp_archiver(
387387
}
388388
}
389389

390+
archival_stm_fence ntp_archiver::emit_rw_fence() {
391+
return {
392+
.read_write_fence
393+
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
394+
// Only use the rw-fence if the feature is enabled which requires
395+
// major version upgrade.
396+
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
397+
};
398+
}
399+
390400
void ntp_archiver::log_collected_traces() noexcept {
391401
try {
392402
_rtclog.bypass_tracing([this] {
@@ -1053,13 +1063,7 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
10531063
std::optional<batch_result> result;
10541064
auto track_paused = _probe->register_archiver_on_hold(uploads_paused);
10551065
if (!uploads_paused) {
1056-
result = co_await upload_next_candidates(
1057-
archival_stm_fence{
1058-
.read_write_fence = fence,
1059-
// Only use the rw-fence if the feature is enabled which
1060-
// requires major version upgrade.
1061-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
1062-
});
1066+
result = co_await upload_next_candidates(emit_rw_fence());
10631067
}
10641068
if (result.has_value()) {
10651069
auto [compacted_upload_result, non_compacted_upload_result]
@@ -2687,13 +2691,7 @@ ss::future<> ntp_archiver::apply_archive_retention() {
26872691
co_return;
26882692
}
26892693

2690-
archival_stm_fence fence = {
2691-
.read_write_fence
2692-
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
2693-
// Only use the rw-fence if the feature is enabled which requires
2694-
// major version upgrade.
2695-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
2696-
};
2694+
auto fence = emit_rw_fence();
26972695

26982696
std::optional<size_t> retention_bytes = ntp_conf.retention_bytes();
26992697
std::optional<std::chrono::milliseconds> retention_ms
@@ -2761,13 +2759,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() {
27612759
if (!may_begin_uploads()) {
27622760
co_return;
27632761
}
2764-
archival_stm_fence fence = {
2765-
.read_write_fence
2766-
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
2767-
// Only use the rw-fence if the feature is enabled which requires
2768-
// major version upgrade.
2769-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
2770-
};
2762+
auto fence = emit_rw_fence();
27712763
auto backlog = co_await _manifest_view->get_retention_backlog();
27722764
if (backlog.has_failure()) {
27732765
if (backlog.error() == cloud_storage::error_outcome::shutting_down) {
@@ -2996,7 +2988,7 @@ ss::future<> ntp_archiver::apply_spillover() {
29962988
if (!may_begin_uploads()) {
29972989
co_return;
29982990
}
2999-
2991+
archival_stm_fence fence = emit_rw_fence();
30002992
const auto manifest_upload_timeout = _conf->manifest_upload_timeout();
30012993
const auto manifest_upload_backoff = _conf->cloud_storage_initial_backoff();
30022994

@@ -3034,6 +3026,12 @@ ss::future<> ntp_archiver::apply_spillover() {
30343026
auto tail = [&]() {
30353027
cloud_storage::spillover_manifest tail(_ntp, _rev);
30363028
for (const auto& meta : manifest()) {
3029+
vlog(
3030+
_rtclog.trace,
3031+
"Adding segment {} to the spillover manifest that starts at "
3032+
"{}",
3033+
meta,
3034+
tail.get_start_offset().value_or(model::offset{}));
30373035
tail.add(meta);
30383036
// No performance impact since all writes here are
30393037
// sequential.
@@ -3090,6 +3088,13 @@ ss::future<> ntp_archiver::apply_spillover() {
30903088
auto deadline = ss::lowres_clock::now() + sync_timeout;
30913089

30923090
auto batch = _parent.archival_meta_stm()->batch_start(deadline, _as);
3091+
if (fence.emit_rw_fence_cmd) {
3092+
vlog(
3093+
_rtclog.debug,
3094+
"spillover, read-write fence: {}",
3095+
fence.read_write_fence);
3096+
batch.read_write_fence(fence.read_write_fence);
3097+
}
30933098
batch.spillover(spillover_meta);
30943099
if (manifest().get_archive_start_offset() == model::offset{}) {
30953100
vlog(
@@ -3111,12 +3116,15 @@ ss::future<> ntp_archiver::apply_spillover() {
31113116
_rtclog.warn,
31123117
"Failed to replicate spillover command: {}",
31133118
error.message());
3119+
break;
31143120
} else {
31153121
vlog(
31163122
_rtclog.info,
31173123
"Uploaded spillover manifest: {}",
31183124
tail.get_manifest_path(remote_path_provider()));
31193125
}
3126+
// Reset fence for the next iteration
3127+
fence = emit_rw_fence();
31203128
}
31213129
}
31223130

@@ -3212,13 +3220,7 @@ ss::future<> ntp_archiver::apply_retention() {
32123220
if (!may_begin_uploads()) {
32133221
co_return;
32143222
}
3215-
archival_stm_fence fence = {
3216-
.read_write_fence
3217-
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
3218-
// Only use the rw-fence if the feature is enabled which requires
3219-
// major version upgrade.
3220-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
3221-
};
3223+
auto fence = emit_rw_fence();
32223224
auto arch_so = manifest().get_archive_start_offset();
32233225
auto stm_so = manifest().get_start_offset();
32243226
if (arch_so != model::offset{} && arch_so != stm_so) {
@@ -3311,13 +3313,7 @@ ss::future<> ntp_archiver::garbage_collect() {
33113313
co_return;
33123314
}
33133315

3314-
archival_stm_fence fence = {
3315-
.read_write_fence
3316-
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
3317-
// Only use the rw-fence if the feature is enabled which requires
3318-
// major version upgrade.
3319-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
3320-
};
3316+
archival_stm_fence fence = emit_rw_fence();
33213317

33223318
// If we are about to delete segments, we must ensure that the remote
33233319
// manifest is fully up to date, so that it is definitely not referring
@@ -3442,13 +3438,8 @@ ntp_archiver::find_reupload_candidate(
34423438

34433439
ssx::composite_abort_source cas{caller_as, _as};
34443440

3445-
archival_stm_fence rw_fence{
3446-
.read_write_fence
3447-
= _parent.archival_meta_stm()->manifest().get_applied_offset(),
3448-
// Only use the rw-fence if the feature is enabled which requires
3449-
// major version upgrade.
3450-
.emit_rw_fence_cmd = emit_read_write_fence(_feature_table),
3451-
};
3441+
archival_stm_fence rw_fence = emit_rw_fence();
3442+
34523443
if (!may_begin_uploads()) {
34533444
co_return find_reupload_candidate_result{};
34543445
}

src/v/cluster/archival/ntp_archiver_service.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ class ntp_archiver {
466466
static constexpr const char* segment_merger_ctx_label
467467
= "adjacent_segment_merger";
468468

469+
/// Create a fence value for the next STM operation
470+
archival_stm_fence emit_rw_fence();
471+
469472
/// Delete objects, return true on success and false otherwise
470473
ss::future<bool>
471474
batch_delete(std::vector<cloud_storage_clients::object_key> paths);

0 commit comments

Comments
 (0)