From b85acf1d6d95a8909a2287c888583407203b7769 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 17 Dec 2024 12:37:17 -0500 Subject: [PATCH 01/10] cluster: Fix archival metadata STM snapshot The snapshot doesn't serialize the applied-offset field of the manifest. This commit fixes this. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- src/v/cluster/archival/archival_metadata_stm.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index 4ea8480291893..f6b8a6ade4d69 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -1335,7 +1335,8 @@ archival_metadata_stm::take_local_snapshot(ssx::semaphore_units apply_units) { .last_partition_scrub = _manifest->last_partition_scrub(), .last_scrubbed_offset = _manifest->last_scrubbed_offset(), .detected_anomalies = _manifest->detected_anomalies(), - .highest_producer_id = _manifest->highest_producer_id()}); + .highest_producer_id = _manifest->highest_producer_id(), + .applied_offset = _manifest->get_applied_offset()}); auto snapshot_offset = last_applied_offset(); apply_units.return_all(); From dc684294efa3c29221d96dedc77457302fdc8afb Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 14 Dec 2024 13:48:42 -0500 Subject: [PATCH 02/10] archival: Use read-write fence in the ntp_archiver The fence is a value which is produced by recording the current offset of the last applied archival STM command. The fence is added to the archival STM config batch. If no commands were applied to the STM the offset of the last applied command will be the same and the configuration batch will be applied. Otherwise, if some command was applied after the record batch was constructed the batch will not be applied. This is essentially a concurrency control mechamism. The commands are already present in the STM but not used as for now. The existing metadata validation mechanism stays. --- .../tests/cloud_storage_e2e_test.cc | 10 ++- .../tests/delete_records_e2e_test.cc | 8 ++- src/v/cloud_storage/tests/produce_utils.h | 3 +- .../topic_namespace_override_recovery_test.cc | 10 ++- .../cluster/archival/ntp_archiver_service.cc | 64 +++++++++++++++++-- src/v/cluster/archival/ntp_archiver_service.h | 14 ++++ .../archival/tests/ntp_archiver_test.cc | 4 +- .../cluster/archival/tests/service_fixture.cc | 9 ++- .../tests/cluster_recovery_backend_test.cc | 5 +- 9 files changed, 108 insertions(+), 19 deletions(-) diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index c18f04b672369..6fad60e16ed02 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -247,7 +247,10 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloudWithSpillover) { log->force_roll(ss::default_priority_class()).get(); ASSERT_TRUE(archiver.sync_for_tests().get()); - archiver.upload_next_candidates().get(); + archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); } ASSERT_EQ( cloud_storage::upload_result::success, @@ -737,7 +740,10 @@ TEST_F(CloudStorageManualMultiNodeTestBase, ReclaimableReportedInHealthReport) { // drive the uploading auto& archiver = prt_l->archiver()->get(); archiver.sync_for_tests().get(); - archiver.upload_next_candidates().get(); + archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); // not for synchronization... just to give the system time to propogate // all the state changes are are happening so that this overall loop diff --git a/src/v/cloud_storage/tests/delete_records_e2e_test.cc b/src/v/cloud_storage/tests/delete_records_e2e_test.cc index cfc0db79dc970..5186fde7ac740 100644 --- a/src/v/cloud_storage/tests/delete_records_e2e_test.cc +++ b/src/v/cloud_storage/tests/delete_records_e2e_test.cc @@ -417,7 +417,9 @@ FIXTURE_TEST( while (produced_kafka_base_offset > stm_manifest.get_next_kafka_offset()) { BOOST_REQUIRE(archiver->sync_for_tests().get()); BOOST_REQUIRE_EQUAL( - archiver->upload_next_candidates() + archiver + ->upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) .get() .non_compacted_upload_result.num_failed, 0); @@ -497,7 +499,9 @@ FIXTURE_TEST(test_delete_from_stm_truncation, delete_records_e2e_fixture) { // Upload more and truncate the manifest past the override. BOOST_REQUIRE(archiver->sync_for_tests().get()); BOOST_REQUIRE_EQUAL( - archiver->upload_next_candidates() + archiver + ->upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) .get() .non_compacted_upload_result.num_failed, 0); diff --git a/src/v/cloud_storage/tests/produce_utils.h b/src/v/cloud_storage/tests/produce_utils.h index b7464153a456b..94f50366d2f55 100644 --- a/src/v/cloud_storage/tests/produce_utils.h +++ b/src/v/cloud_storage/tests/produce_utils.h @@ -90,7 +90,8 @@ class remote_segment_generator { co_return -1; } if ( - (co_await archiver.upload_next_candidates()) + (co_await archiver.upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true})) .non_compacted_upload_result.num_failed > 0) { co_return -1; diff --git a/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc b/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc index 11359fdd8c67a..33984edd6451c 100644 --- a/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc +++ b/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc @@ -120,7 +120,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) { // Sync archiver, upload candidates (if needed) and upload manifest. archiver.sync_for_tests().get(); - std::ignore = archiver.upload_next_candidates().get(); + std::ignore = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); archiver.upload_topic_manifest().get(); } @@ -188,7 +191,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) { .get()); archiver.sync_for_tests().get(); - std::ignore = archiver.upload_next_candidates().get(); + std::ignore = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); // Check requests with the same predicate at end of scope, just to be // explicit about bad requests. diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 3c65c0a81ca56..c9ed834322ea1 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -760,8 +760,19 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { continue; } + // This is the offset of the last applied command. It is used + // as a fence to implement optimistic concurrency control. + auto fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(); + vlog( + archival_log.debug, + "fence value is: {}, in-sync offset: {}", + fence, + _parent.archival_meta_stm()->get_insync_offset()); + auto [non_compacted_upload_result, compacted_upload_result] - = co_await upload_next_candidates(); + = co_await upload_next_candidates( + archival_stm_fence{.read_write_fence = fence}); if (non_compacted_upload_result.num_failed != 0) { // The logic in class `remote` already does retries: if we get here, // it means the upload failed after several retries, justifying @@ -2087,6 +2098,7 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { } ss::future ntp_archiver::wait_uploads( + archival_stm_fence fence, std::vector scheduled, segment_upload_kind segment_kind, bool inline_manifest) { @@ -2165,6 +2177,21 @@ ss::future ntp_archiver::wait_uploads( = config::shard_local_cfg() .cloud_storage_disable_upload_consistency_checks.value(); if (!checks_disabled) { + // With read-write-fence it's guaranteed that the concurrent + // updates are not a problem. But we still need this check + // to prevent certain bugs from corrupting the cloud storage + // metadata. + // Overall, we're checking that the update makes sense here + // (that the new segment lines up with the previous one). Then + // we're checking that the segment actually matches its + // metadata. And then we're replicating the metadata with the + // fence that guarantees that no updates are made to the STM + // state interim. + // In other words, we're basing the decision to start an upload + // on the precondition. Then we're validating the actual uploads + // against this precondition and then we're discarding the + // changes to the STM state if the precondition is no longer + // valid. std::vector meta; for (size_t i = 0; i < segment_results.size(); i++) { meta.push_back(scheduled[ixupload[i]].meta.value()); @@ -2256,14 +2283,33 @@ ss::future ntp_archiver::wait_uploads( ? _parent.highest_producer_id() : model::producer_id{}; - auto error = co_await _parent.archival_meta_stm()->add_segments( + auto batch_builder = _parent.archival_meta_stm()->batch_start( + deadline, _as); + if (!fence.unsafe_add) { + // The fence should be added first because it can only + // affect commands which are following it in the same record + // batch. + vlog( + archival_log.debug, + "fence value is: {}, unsafe add: {}, manifest last " + "applied offset: {}, manifest in-sync offset: {}", + fence.read_write_fence, + fence.unsafe_add, + _parent.archival_meta_stm()->manifest().get_applied_offset(), + _parent.archival_meta_stm()->get_insync_offset()); + batch_builder.read_write_fence(fence.read_write_fence); + } + batch_builder.add_segments( mdiff, - manifest_clean_offset, - highest_producer_id, - deadline, - _as, checks_disabled ? cluster::segment_validated::no : cluster::segment_validated::yes); + if (manifest_clean_offset.has_value()) { + batch_builder.mark_clean(manifest_clean_offset.value()); + } + batch_builder.update_highest_producer_id(highest_producer_id); + + auto error = co_await batch_builder.replicate(); + if ( error != cluster::errc::success && error != cluster::errc::not_leader) { @@ -2303,6 +2349,7 @@ ss::future ntp_archiver::wait_uploads( } ss::future ntp_archiver::wait_all_scheduled_uploads( + archival_stm_fence fence, std::vector scheduled) { // Split the set of scheduled uploads into compacted and non compacted // uploads, and then wait for them separately. They can also be waited on @@ -2341,10 +2388,12 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( auto [non_compacted_result, compacted_result] = co_await ss::when_all_succeed( wait_uploads( + fence, std::move(non_compacted_uploads), segment_upload_kind::non_compacted, inline_manifest_in_non_compacted_uploads), wait_uploads( + fence, std::move(compacted_uploads), segment_upload_kind::compacted, !inline_manifest_in_non_compacted_uploads)); @@ -2374,6 +2423,7 @@ model::offset ntp_archiver::max_uploadable_offset_exclusive() const { } ss::future ntp_archiver::upload_next_candidates( + archival_stm_fence fence, std::optional unsafe_max_offset_override_exclusive) { auto max_offset_exclusive = unsafe_max_offset_override_exclusive ? *unsafe_max_offset_override_exclusive @@ -2389,7 +2439,7 @@ ss::future ntp_archiver::upload_next_candidates( auto scheduled_uploads = co_await schedule_uploads( max_offset_exclusive); co_return co_await wait_all_scheduled_uploads( - std::move(scheduled_uploads)); + fence, std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { } catch (const ss::abort_requested_exception&) { } diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index bc1456cc1a21d..36b29ac58db5b 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -114,6 +114,17 @@ enum class wait_result { not_in_progress, complete, lost_leadership, failed }; std::ostream& operator<<(std::ostream& os, wait_result wr); +/// Fence value for the archival STM. +/// The value is used to implement optimistic +/// concurrency control. +struct archival_stm_fence { + // Offset of the last command added to the + // archival STM + model::offset read_write_fence; + // Disable fencing in tests + bool unsafe_add{false}; +}; + /// This class performs per-ntp archival workload. Every ntp can be /// processed independently, without the knowledge about others. All /// 'ntp_archiver' instances that the shard possesses are supposed to be @@ -225,6 +236,7 @@ class ntp_archiver { /// offset. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( + archival_stm_fence fence, std::optional unsafe_max_offset_override_exclusive = std::nullopt); @@ -498,6 +510,7 @@ class ntp_archiver { /// /// Update the probe and manifest ss::future wait_all_scheduled_uploads( + archival_stm_fence fence, std::vector scheduled); /// Waits for scheduled segment uploads. The uploaded segments could be @@ -505,6 +518,7 @@ class ntp_archiver { /// cases with the major difference being the probe updates done after /// the upload. ss::future wait_uploads( + archival_stm_fence fence, std::vector scheduled, segment_upload_kind segment_kind, bool inline_manifest); diff --git a/src/v/cluster/archival/tests/ntp_archiver_test.cc b/src/v/cluster/archival/tests/ntp_archiver_test.cc index afe20db72a2d4..5eb6d50796ba9 100644 --- a/src/v/cluster/archival/tests/ntp_archiver_test.cc +++ b/src/v/cluster/archival/tests/ntp_archiver_test.cc @@ -404,7 +404,9 @@ FIXTURE_TEST( }); retry_chain_node fib(never_abort); - auto res = archiver.upload_next_candidates().get(); + auto res = archiver + .upload_next_candidates(archival_stm_fence{.unsafe_add = true}) + .get(); auto&& [non_compacted_result, compacted_result] = res; BOOST_REQUIRE_EQUAL(non_compacted_result.num_succeeded, 0); diff --git a/src/v/cluster/archival/tests/service_fixture.cc b/src/v/cluster/archival/tests/service_fixture.cc index 411a99f53cf44..15bada91d9bf6 100644 --- a/src/v/cluster/archival/tests/service_fixture.cc +++ b/src/v/cluster/archival/tests/service_fixture.cc @@ -10,6 +10,7 @@ #include "cluster/archival/tests/service_fixture.h" +#include "archival/ntp_archiver_service.h" #include "base/seastarx.h" #include "bytes/iobuf.h" #include "bytes/iobuf_parser.h" @@ -557,7 +558,8 @@ archiver_fixture::do_upload_next( if (model::timeout_clock::now() > deadline) { co_return archival::ntp_archiver::batch_result{}; } - auto result = co_await archiver.upload_next_candidates(lso); + auto result = co_await archiver.upload_next_candidates( + archival_stm_fence{.unsafe_add = true}, lso); auto num_success = result.compacted_upload_result.num_succeeded + result.non_compacted_upload_result.num_succeeded; if (num_success > 0) { @@ -581,8 +583,9 @@ void archiver_fixture::upload_and_verify( tests::cooperative_spin_wait_with_timeout( 10s, [&archiver, expected, lso]() { - return archiver.upload_next_candidates(lso).then( - [expected](auto result) { return result == expected; }); + return archiver + .upload_next_candidates(archival_stm_fence{.unsafe_add = true}, lso) + .then([expected](auto result) { return result == expected; }); }) .get(); } diff --git a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc index cdb7ac135e656..b9a47d43e4436 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc +++ b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc @@ -180,7 +180,10 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { } auto& archiver = p->archiver().value().get(); archiver.sync_for_tests().get(); - auto res = archiver.upload_next_candidates().get(); + auto res = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); ASSERT_GT(res.non_compacted_upload_result.num_succeeded, 0); archiver.upload_topic_manifest().get(); archiver.upload_manifest("test").get(); From d4f6bcabe7be31a5e64fb495172f4411401d8176 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 17 Dec 2024 17:28:59 -0500 Subject: [PATCH 03/10] cluster: Remove consistency check from the archival STM The check is no longer needed because we're using fencing mechanism to prevent concurrency. The check is actually performed when the segment is uploaded. The removed check was used as an optimistic concurrency control mechanism and is now replaced by the RW-fence. Also, deprecate the error code that STM uses to communicate the consistency violation to the caller. The error code is not used anywhere else. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../cluster/archival/archival_metadata_stm.cc | 24 ------------------- .../tests/archival_metadata_stm_gtest.cc | 4 +++- src/v/cluster/errc.h | 4 +--- src/v/cluster/errors.cc | 2 -- src/v/config/configuration.cc | 8 +------ src/v/kafka/server/errors.h | 1 - 6 files changed, 5 insertions(+), 38 deletions(-) diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index f6b8a6ade4d69..6b7dd9c1ad3ad 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -1404,30 +1404,6 @@ void archival_metadata_stm::maybe_notify_waiter(std::exception_ptr e) noexcept { void archival_metadata_stm::apply_add_segment(const segment& segment) { auto meta = segment.meta; - bool disable_safe_add - = config::shard_local_cfg() - .cloud_storage_disable_metadata_consistency_checks.value(); - if ( - !disable_safe_add && segment.is_validated == segment_validated::yes - && !_manifest->safe_segment_meta_to_add(meta)) { - // We're only validating segment metadata records if they're validated. - // It goes like this - // - npt_archiver_service validates segment_meta instances before - // replication - // - replicated add_segment commands have 'is_validated' field set to - // 'yes' - // - old records in the log have 'is_validated' field set to 'no' - // - the 'apply_add_segment' will only validate new commands and add old - // ones unconditionally - auto last = _manifest->last_segment(); - vlog( - _logger.error, - "Can't add segment: {}, previous segment: {}", - meta, - last); - maybe_notify_waiter(errc::inconsistent_stm_update); - return; - } if (meta.ntp_revision == model::initial_revision_id{}) { // metadata serialized by old versions of redpanda doesn't have the // ntp_revision field. diff --git a/src/v/cluster/archival/tests/archival_metadata_stm_gtest.cc b/src/v/cluster/archival/tests/archival_metadata_stm_gtest.cc index 4b02fb1882a1b..00d7ac30a7273 100644 --- a/src/v/cluster/archival/tests/archival_metadata_stm_gtest.cc +++ b/src/v/cluster/archival/tests/archival_metadata_stm_gtest.cc @@ -550,10 +550,12 @@ TEST_F_CORO( repl_err = co_await get_leader_stm() .batch_start(deadline, never_abort) + // Invalid rw-fence to simulate concurent update + .read_write_fence(model::offset(33)) .add_segments( std::move(poisoned_segment), cluster::segment_validated::yes) .replicate(); - ASSERT_EQ_CORO(repl_err, cluster::errc::inconsistent_stm_update); + ASSERT_EQ_CORO(repl_err, cluster::errc::concurrent_modification_error); // Check that it still works with consistent updates good_segment.clear(); diff --git a/src/v/cluster/errc.h b/src/v/cluster/errc.h index 7704845ad5b11..9787d19432b88 100644 --- a/src/v/cluster/errc.h +++ b/src/v/cluster/errc.h @@ -81,7 +81,7 @@ enum class errc : int16_t { transform_count_limit_exceeded, role_exists, role_does_not_exist, - inconsistent_stm_update, + inconsistent_stm_update [[deprecated]], waiting_for_shard_placement_update, topic_invalid_partitions_core_limit, topic_invalid_partitions_memory_limit, @@ -255,8 +255,6 @@ struct errc_category final : public std::error_category { return "Role already exists"; case errc::role_does_not_exist: return "Role does not exist"; - case errc::inconsistent_stm_update: - return "STM command can't be applied"; case errc::waiting_for_shard_placement_update: return "Waiting for shard placement table update to finish"; case errc::topic_invalid_partitions_core_limit: diff --git a/src/v/cluster/errors.cc b/src/v/cluster/errors.cc index c1427f37453e4..e7c54599051f8 100644 --- a/src/v/cluster/errors.cc +++ b/src/v/cluster/errors.cc @@ -150,8 +150,6 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) { return o << "cluster::errc::role_exists"; case errc::role_does_not_exist: return o << "cluster::errc::role_does_not_exist"; - case errc::inconsistent_stm_update: - return o << "cluster::errc::inconsistent_stm_update"; case errc::waiting_for_shard_placement_update: return o << "cluster::errc::waiting_for_shard_placement_update"; case errc::topic_invalid_partitions_core_limit: diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 3fd837c66db99..56f5af8ad4f53 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2399,13 +2399,7 @@ configuration::configuration() {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, false) , cloud_storage_disable_metadata_consistency_checks( - *this, - "cloud_storage_disable_metadata_consistency_checks", - "Disable all metadata consistency checks. This will allow redpanda to " - "replay logs with inconsistent tiered-storage metadata. Normally, this " - "option should be disabled.", - {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - true) + *this, "cloud_storage_disable_metadata_consistency_checks") , cloud_storage_hydration_timeout_ms( *this, "cloud_storage_hydration_timeout_ms", diff --git a/src/v/kafka/server/errors.h b/src/v/kafka/server/errors.h index a4c4d4301fcf3..ab4c4ec981923 100644 --- a/src/v/kafka/server/errors.h +++ b/src/v/kafka/server/errors.h @@ -105,7 +105,6 @@ constexpr error_code map_topic_error_code(cluster::errc code) { case cluster::errc::transform_count_limit_exceeded: case cluster::errc::role_exists: case cluster::errc::role_does_not_exist: - case cluster::errc::inconsistent_stm_update: case cluster::errc::waiting_for_shard_placement_update: case cluster::errc::producer_ids_vcluster_limit_exceeded: case cluster::errc::validation_of_recovery_topic_failed: From 0860f7004ae630a63837175f0a8f7153d543fb63 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Tue, 17 Dec 2024 17:30:48 -0500 Subject: [PATCH 04/10] config: Deprecate cloud_storage_disable_metadata_consistency_checks The option was never turned on by default. Our plan was to eventually enable it but it is now replaced with fencing mechanism which is always on and can't be disabled. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- src/v/config/configuration.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 4ccfac65e40fb..240fd1d5bcef9 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -468,7 +468,7 @@ struct configuration final : public config_store { property cloud_storage_topic_purge_grace_period_ms; property cloud_storage_disable_upload_consistency_checks; - property cloud_storage_disable_metadata_consistency_checks; + deprecated_property cloud_storage_disable_metadata_consistency_checks; property cloud_storage_hydration_timeout_ms; property cloud_storage_disable_remote_labels_for_tests; From 8b335484c5b2e84935933d4e88cc1db16367f16a Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Wed, 18 Dec 2024 14:07:42 -0500 Subject: [PATCH 05/10] archival: Use RW-fence in housekeeping This commit enables rw-fence mechanism for housekeeping reuploads. In case if the housekeeping is running on a stale version of the STM snapshot the update will be rejected. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../archival/adjacent_segment_merger.cc | 29 ++++--- .../cluster/archival/ntp_archiver_service.cc | 86 ++++++++++++------- src/v/cluster/archival/ntp_archiver_service.h | 14 +-- 3 files changed, 81 insertions(+), 48 deletions(-) diff --git a/src/v/cluster/archival/adjacent_segment_merger.cc b/src/v/cluster/archival/adjacent_segment_merger.cc index 1809fb913029e..4694ec9754239 100644 --- a/src/v/cluster/archival/adjacent_segment_merger.cc +++ b/src/v/cluster/archival/adjacent_segment_merger.cc @@ -213,29 +213,32 @@ adjacent_segment_merger::run(run_quota_t quota) { const cloud_storage::partition_manifest& manifest) { return scan_manifest(local_start_offset, manifest); }; - auto [archiver_units, upl] = co_await _archiver.find_reupload_candidate( - scanner); - if (!upl.has_value()) { + auto find_res = co_await _archiver.find_reupload_candidate(scanner); + if (!find_res.locks.has_value()) { vlog(_ctxlog.debug, "No more upload candidates"); co_return result; } - vassert(archiver_units.has_value(), "Must take archiver units"); - auto next = model::next_offset(upl->candidate.final_offset); + vassert(find_res.units.has_value(), "Must take archiver units"); + auto next = model::next_offset(find_res.locks->candidate.final_offset); vlog( _ctxlog.debug, - "Going to upload segment {}, num source segments {}, last offset {}", - upl->candidate.exposed_name, - upl->candidate.sources.size(), - upl->candidate.final_offset); - for (const auto& src : upl->candidate.sources) { + "Going to upload segment {}, num source segments {}, last offset {}, " + "read-write-fence value: {}", + find_res.locks->candidate.exposed_name, + find_res.locks->candidate.sources.size(), + find_res.locks->candidate.final_offset, + find_res.read_write_fence.read_write_fence); + for (const auto& src : find_res.locks->candidate.sources) { vlog( _ctxlog.debug, "Local log segment {} found, size {}", src->filename(), src->size_bytes()); } + auto uploaded = co_await _archiver.upload( - std::move(*archiver_units), std::move(*upl), std::ref(_root_rtc)); + std::move(find_res), std::ref(_root_rtc)); + if (uploaded) { _last = next; result.status = run_status::ok; @@ -246,14 +249,14 @@ adjacent_segment_merger::run(run_quota_t quota) { result.remaining = result.remaining - run_quota_t{1}; vlog( _ctxlog.debug, - "Successfuly uploaded segment, new last offfset is {}", + "Successfully uploaded segment, new last offset is {}", _last); } else { // Upload failed result.status = run_status::failed; vlog( _ctxlog.debug, - "Failed to upload segment, last offfset is {}", + "Failed to upload segment, last offset is {}", _last); } } diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index c9ed834322ea1..337e5d526a315 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -3301,18 +3301,23 @@ ntp_archiver::get_housekeeping_jobs() { return res; } -ss::future, - std::optional>> +ss::future ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { ss::gate::holder holder(_gate); + archival_stm_fence rw_fence{ + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; if (!may_begin_uploads()) { - co_return std::make_pair(std::nullopt, std::nullopt); + co_return find_reupload_candidate_result{ + std::nullopt, std::nullopt, {}}; } auto run = scanner(_parent.raft_start_offset(), manifest()); if (!run.has_value()) { vlog(_rtclog.debug, "Scan didn't resulted in upload candidate"); - co_return std::make_pair(std::nullopt, std::nullopt); + co_return find_reupload_candidate_result{ + std::nullopt, std::nullopt, {}}; } else { vlog(_rtclog.debug, "Scan result: {}", run); } @@ -3331,18 +3336,16 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { auto candidate = co_await collector.make_upload_candidate( _conf->upload_io_priority, _conf->segment_upload_timeout()); - using ret_t = std::pair< - std::optional, - std::optional>; co_return ss::visit( candidate, - [](std::monostate) -> ret_t { + [](std::monostate) -> find_reupload_candidate_result { vassert( false, "unexpected default re-upload candidate creation result"); }, - [this, &run, units = std::move(units)]( - upload_candidate_with_locks& upload_candidate) mutable -> ret_t { + [this, &run, &rw_fence, units = std::move(units)]( + upload_candidate_with_locks& upload_candidate) mutable + -> find_reupload_candidate_result { if ( upload_candidate.candidate.content_length != run->meta.size_bytes @@ -3357,27 +3360,28 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { "{}, run: {}", upload_candidate.candidate, run->meta); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; } - return std::make_pair( - std::move(units), std::move(upload_candidate)); + return {std::move(units), std::move(upload_candidate), rw_fence}; }, - [this](skip_offset_range& skip_offsets) -> ret_t { + [this]( + skip_offset_range& skip_offsets) -> find_reupload_candidate_result { vlog( _rtclog.warn, "Failed to make reupload candidate: {}", skip_offsets.reason); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; }, - [this](candidate_creation_error& error) -> ret_t { + [this]( + candidate_creation_error& error) -> find_reupload_candidate_result { const auto log_level = log_level_for_error(error); vlogl( _rtclog, log_level, "Failed to make reupload candidate: {}", error); - return std::make_pair(std::nullopt, std::nullopt); + return {std::nullopt, std::nullopt, {}}; }); } // segment_name exposed_name; @@ -3395,18 +3399,26 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { = cloud_storage::partition_manifest::generate_remote_segment_name( run->meta); // Create a remote upload candidate - co_return std::make_pair( - std::move(units), upload_candidate_with_locks{std::move(candidate)}); + co_return find_reupload_candidate_result{ + std::move(units), + upload_candidate_with_locks{std::move(candidate)}, + rw_fence}; } ss::future ntp_archiver::upload( - ssx::semaphore_units archiver_units, - upload_candidate_with_locks upload_locks, + find_reupload_candidate_result find_res, std::optional> source_rtc) { ss::gate::holder holder(_gate); - auto units = std::move(archiver_units); - if (upload_locks.candidate.sources.size() > 0) { - co_return co_await do_upload_local(std::move(upload_locks), source_rtc); + if (!find_res.locks.has_value() || !find_res.units.has_value()) { + // The method shouldn't be called if this is the case + co_return false; + } + auto units = std::move(find_res.units); + if (find_res.locks->candidate.sources.size() > 0) { + co_return co_await do_upload_local( + find_res.read_write_fence, + std::move(find_res.locks.value()), + source_rtc); } // Currently, the uploading of remote segments is disabled and // the only reason why the list of locks is empty is truncation. @@ -3417,6 +3429,7 @@ ss::future ntp_archiver::upload( } ss::future ntp_archiver::do_upload_local( + archival_stm_fence fence, upload_candidate_with_locks upload_locks, std::optional> source_rtc) { if (!may_begin_uploads()) { @@ -3531,14 +3544,27 @@ ss::future ntp_archiver::do_upload_local( ? _parent.highest_producer_id() : model::producer_id{}; auto deadline = ss::lowres_clock::now() + _conf->manifest_upload_timeout(); - auto error = co_await _parent.archival_meta_stm()->add_segments( + + auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + vlog( + archival_log.debug, + "(2) fence value is: {}, unsafe add: {}, manifest last applied " + "offset: {}, manifest in-sync offset: {}", + fence.read_write_fence, + fence.unsafe_add, + _parent.archival_meta_stm()->manifest().get_applied_offset(), + _parent.archival_meta_stm()->get_insync_offset()); + builder.read_write_fence(fence.read_write_fence); + } + builder.add_segments( {meta}, - std::nullopt, - highest_producer_id, - deadline, - _as, checks_disabled ? cluster::segment_validated::no : cluster::segment_validated::yes); + builder.update_highest_producer_id(highest_producer_id); + + auto error = co_await builder.replicate(); + if (error != cluster::errc::success && error != cluster::errc::not_leader) { vlog( _rtclog.warn, diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index 36b29ac58db5b..cc1155152d9c5 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -336,6 +336,12 @@ class ntp_archiver { model::offset local_start_offset, const cloud_storage::partition_manifest& manifest)>; + struct find_reupload_candidate_result { + std::optional units; + std::optional locks; + archival_stm_fence read_write_fence; + }; + /// Find upload candidate /// /// Depending on the output of the 'scanner' the upload candidate @@ -345,9 +351,7 @@ class ntp_archiver { /// /// \param scanner is a user provided function used to find upload candidate /// \return {nullopt, nullopt} or the archiver lock and upload candidate - ss::future, - std::optional>> + ss::future find_reupload_candidate(manifest_scanner_t scanner); /** @@ -366,8 +370,7 @@ class ntp_archiver { * \return true on success and false otherwise */ ss::future upload( - ssx::semaphore_units archiver_units, - upload_candidate_with_locks candidate, + find_reupload_candidate_result find_res, std::optional> source_rtc); /// Return reference to partition manifest from archival STM @@ -445,6 +448,7 @@ class ntp_archiver { ss::future<> maybe_complete_flush(); ss::future do_upload_local( + archival_stm_fence fence, upload_candidate_with_locks candidate, std::optional> source_rtc); ss::future do_upload_remote( From e2bc7c4f01939e2d5e6c22981b143d72f36aad5b Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Fri, 20 Dec 2024 19:05:20 -0500 Subject: [PATCH 06/10] archival: Use rw-fence in retention/GC code Use rw-fence for both normal retention/GC and spillover. Currently, the upload metadata validation only checks added segments. Because of that there is still some room for concurrent updates during the housekeeping (GC or retention). With rw-fence the metadata updates related to retention or GC will become safer. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../cluster/archival/ntp_archiver_service.cc | 73 +++++++++++++++++-- 1 file changed, 67 insertions(+), 6 deletions(-) diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 337e5d526a315..f8131f7f67254 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -2625,6 +2625,13 @@ ss::future<> ntp_archiver::apply_archive_retention() { vlog(_rtclog.trace, "NTP is not collectable"); co_return; } + + archival_stm_fence fence = { + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; + std::optional retention_bytes = ntp_conf.retention_bytes(); std::optional retention_ms = ntp_conf.retention_duration(); @@ -2661,6 +2668,13 @@ ss::future<> ntp_archiver::apply_archive_retention() { auto deadline = ss::lowres_clock::now() + sync_timeout; auto batch = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + vlog( + _rtclog.debug, + "apply_archive_retention, read_write_fence: {}", + fence.read_write_fence); + batch.read_write_fence(fence.read_write_fence); + } batch.truncate_archive_init(res.value().offset, res.value().delta); auto error = co_await batch.replicate(); @@ -2682,6 +2696,11 @@ ss::future<> ntp_archiver::garbage_collect_archive() { if (!may_begin_uploads()) { co_return; } + archival_stm_fence fence = { + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; auto backlog = co_await _manifest_view->get_retention_backlog(); if (backlog.has_failure()) { if (backlog.error() == cloud_storage::error_outcome::shutting_down) { @@ -2864,8 +2883,16 @@ ss::future<> ntp_archiver::garbage_collect_archive() { auto sync_timeout = config::shard_local_cfg() .cloud_storage_metadata_sync_timeout_ms.value(); auto deadline = ss::lowres_clock::now() + sync_timeout; - auto error = co_await _parent.archival_meta_stm()->cleanup_archive( - new_clean_offset, bytes_to_remove, deadline, _as); + auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + vlog( + _rtclog.debug, + "garbage_collect_archive, read_write_fence: {}", + fence.read_write_fence); + builder.read_write_fence(fence.read_write_fence); + } + builder.cleanup_archive(new_clean_offset, bytes_to_remove); + auto error = co_await builder.replicate(); if (error != cluster::errc::success) { vlog( @@ -3118,6 +3145,11 @@ ss::future<> ntp_archiver::apply_retention() { if (!may_begin_uploads()) { co_return; } + archival_stm_fence fence = { + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; auto arch_so = manifest().get_archive_start_offset(); auto stm_so = manifest().get_start_offset(); if (arch_so != model::offset{} && arch_so != stm_so) { @@ -3164,8 +3196,22 @@ ss::future<> ntp_archiver::apply_retention() { auto sync_timeout = config::shard_local_cfg() .cloud_storage_metadata_sync_timeout_ms.value(); auto deadline = ss::lowres_clock::now() + sync_timeout; - auto error = co_await _parent.archival_meta_stm()->truncate( - *next_start_offset, deadline, _as); + + auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + // Currently, the 'unsafe_add' is always set to 'false' + // because the fence is generated inside this method. It's still + // good to have this condition in case if this will be changed. + vlog( + _rtclog.debug, + "apply_retention, read_write_fence {}", + fence.read_write_fence); + builder.read_write_fence(fence.read_write_fence); + } + builder.truncate(*next_start_offset); + + auto error = co_await builder.replicate(); + if (error != cluster::errc::success) { vlog( _rtclog.warn, @@ -3194,6 +3240,12 @@ ss::future<> ntp_archiver::garbage_collect() { co_return; } + archival_stm_fence fence = { + .read_write_fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(), + .unsafe_add = false, + }; + // If we are about to delete segments, we must ensure that the remote // manifest is fully up to date, so that it is definitely not referring // to any of the segments we will delete in its list of active segments. @@ -3254,8 +3306,17 @@ ss::future<> ntp_archiver::garbage_collect() { auto sync_timeout = config::shard_local_cfg() .cloud_storage_metadata_sync_timeout_ms.value(); auto deadline = ss::lowres_clock::now() + sync_timeout; - auto error = co_await _parent.archival_meta_stm()->cleanup_metadata( - deadline, _as); + + auto builder = _parent.archival_meta_stm()->batch_start(deadline, _as); + if (!fence.unsafe_add) { + vlog( + _rtclog.debug, + "garbage_collect, read-write fence: {}", + fence.read_write_fence); + builder.read_write_fence(fence.read_write_fence); + } + builder.cleanup_metadata(); + auto error = co_await builder.replicate(); if (error != cluster::errc::success) { vlog( From df0cbd9b7dd49fff009e71b047a374ad9a9ecd0f Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 21 Dec 2024 09:28:28 -0500 Subject: [PATCH 07/10] fixup! update first ntp-archiver commit (log formatting changed) --- src/v/cluster/archival/ntp_archiver_service.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index f8131f7f67254..a94e7b85c9717 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -2291,7 +2291,8 @@ ss::future ntp_archiver::wait_uploads( // batch. vlog( archival_log.debug, - "fence value is: {}, unsafe add: {}, manifest last " + "add_segments, read-write fence: {}, unsafe add: {}, manifest " + "last " "applied offset: {}, manifest in-sync offset: {}", fence.read_write_fence, fence.unsafe_add, From 669c67218b4c592808c93d65da220596939d93a5 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 21 Dec 2024 09:29:02 -0500 Subject: [PATCH 08/10] fixup! update commit that enables rw-fence in housekeeping (log formatting) --- src/v/cluster/archival/ntp_archiver_service.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index a94e7b85c9717..1cdb1cfe4fe56 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -2672,7 +2672,7 @@ ss::future<> ntp_archiver::apply_archive_retention() { if (!fence.unsafe_add) { vlog( _rtclog.debug, - "apply_archive_retention, read_write_fence: {}", + "truncate_archive_init, read-write fence: {}", fence.read_write_fence); batch.read_write_fence(fence.read_write_fence); } @@ -2888,7 +2888,7 @@ ss::future<> ntp_archiver::garbage_collect_archive() { if (!fence.unsafe_add) { vlog( _rtclog.debug, - "garbage_collect_archive, read_write_fence: {}", + "cleanup_archive, read-write fence: {}", fence.read_write_fence); builder.read_write_fence(fence.read_write_fence); } @@ -3205,7 +3205,7 @@ ss::future<> ntp_archiver::apply_retention() { // good to have this condition in case if this will be changed. vlog( _rtclog.debug, - "apply_retention, read_write_fence {}", + "truncate, read-write fence: {}", fence.read_write_fence); builder.read_write_fence(fence.read_write_fence); } @@ -3312,7 +3312,7 @@ ss::future<> ntp_archiver::garbage_collect() { if (!fence.unsafe_add) { vlog( _rtclog.debug, - "garbage_collect, read-write fence: {}", + "cleanup_metadata, read-write fence: {}", fence.read_write_fence); builder.read_write_fence(fence.read_write_fence); } From 8d5bdfd981ece12d9a2c51e3053965c649e47804 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 21 Dec 2024 09:29:08 -0500 Subject: [PATCH 09/10] archival: Add debug logging (temporary) to archival STM Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../cluster/archival/archival_metadata_stm.cc | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/archival/archival_metadata_stm.cc b/src/v/cluster/archival/archival_metadata_stm.cc index 6b7dd9c1ad3ad..1cd91bb578851 100644 --- a/src/v/cluster/archival/archival_metadata_stm.cc +++ b/src/v/cluster/archival/archival_metadata_stm.cc @@ -1041,6 +1041,11 @@ ss::future archival_metadata_stm::do_add_segments( } ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) { + /*TODO: remove*/ vlog( + _log.debug, + "NEEDLE do_apply applying batch {} ({})", + b.base_offset(), + b.header().type); if ( b.header().type != model::record_batch_type::archival_metadata && b.header().type != model::record_batch_type::prefix_truncate) { @@ -1071,14 +1076,24 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) { } }); } else { - auto on_exit = ss::defer( - [this] { maybe_notify_waiter(errc::success); }); + /*TODO: remove*/ vlog(_log.debug, "NEEDLE do_apply called"); + auto on_exit = ss::defer([this] { + maybe_notify_waiter(errc::success); + /*TODO: remove*/ vlog(_log.debug, "NEEDLE do_apply completed"); + }); try { b.for_each_record([this, base_offset = b.base_offset()]( model::record&& r) { auto key = serde::from_iobuf(r.release_key()); + /*TODO: remove*/ vlog( + _log.debug, "NEEDLE do_apply command key {}", key); + if (key != read_write_fence_cmd::key) { + /*TODO: remove*/ vlog( + _log.debug, + "NEEDLE do_apply advance applied offset: {}", + base_offset + model::offset{r.offset_delta()}); _manifest->advance_applied_offset( base_offset + model::offset{r.offset_delta()}); } @@ -1152,6 +1167,10 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) { if (apply_read_write_fence( serde::from_iobuf( r.release_value()))) { + /*TODO: remove*/ vlog( + _log.debug, + "NEEDLE do_apply rw-fence concurrency violation {}", + r.release_value()); // This means that there is a concurrency violation. The // fence was created before some other command was // applied. We can't apply the commands from this batch. From 9b0a7489e69f45c115919640095a10acccb30866 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sun, 22 Dec 2024 07:52:54 -0500 Subject: [PATCH 10/10] archival: Add rw-fence feature flag Previous implementation of the archival STM had a bug because of which the last applied offset wasn't added to the snapshot. This could potentially make replicas diverge. The solution is to add a feature flag and use the rw-fence mechanism only if all replicas are upgraded. Signed-off-by: Evgeny Lazin <4lazin@gmail.com> --- .../cluster/archival/ntp_archiver_service.cc | 34 +++++++++++++++---- src/v/features/feature_table.cc | 2 ++ src/v/features/feature_table.h | 1 + 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 1cdb1cfe4fe56..ad7b5020b0557 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -771,8 +771,13 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { _parent.archival_meta_stm()->get_insync_offset()); auto [non_compacted_upload_result, compacted_upload_result] - = co_await upload_next_candidates( - archival_stm_fence{.read_write_fence = fence}); + = co_await upload_next_candidates(archival_stm_fence{ + .read_write_fence = fence, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), + }); if (non_compacted_upload_result.num_failed != 0) { // The logic in class `remote` already does retries: if we get here, // it means the upload failed after several retries, justifying @@ -2630,7 +2635,10 @@ ss::future<> ntp_archiver::apply_archive_retention() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; std::optional retention_bytes = ntp_conf.retention_bytes(); @@ -2700,7 +2708,10 @@ ss::future<> ntp_archiver::garbage_collect_archive() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; auto backlog = co_await _manifest_view->get_retention_backlog(); if (backlog.has_failure()) { @@ -3149,7 +3160,10 @@ ss::future<> ntp_archiver::apply_retention() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; auto arch_so = manifest().get_archive_start_offset(); auto stm_so = manifest().get_start_offset(); @@ -3244,7 +3258,10 @@ ss::future<> ntp_archiver::garbage_collect() { archival_stm_fence fence = { .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; // If we are about to delete segments, we must ensure that the remote @@ -3369,7 +3386,10 @@ ntp_archiver::find_reupload_candidate(manifest_scanner_t scanner) { archival_stm_fence rw_fence{ .read_write_fence = _parent.archival_meta_stm()->manifest().get_applied_offset(), - .unsafe_add = false, + // Only use the rw-fence if the feature is enabled which requires + // major version upgrade. + .unsafe_add = !_feature_table.local().is_active( + features::feature::cloud_storage_metadata_rw_fence), }; if (!may_begin_uploads()) { co_return find_reupload_candidate_result{ diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index eae3596b6c599..cc16faf59a123 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -98,6 +98,8 @@ std::string_view to_string_view(feature f) { return "datalake_iceberg"; case feature::raft_symmetric_reconfiguration_cancel: return "raft_symmetric_reconfiguration_cancel"; + case feature::cloud_storage_metadata_rw_fence: + return "cloud_storage_metadata_rw_fence"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index c6fc643a6aa0c..98116cb694779 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -69,6 +69,7 @@ enum class feature : std::uint64_t { shadow_indexing_split_topic_property_update = 1ULL << 53U, datalake_iceberg = 1ULL << 54U, raft_symmetric_reconfiguration_cancel = 1ULL << 55U, + cloud_storage_metadata_rw_fence = 1ULL << 56U, // Dummy features for testing only test_alpha = 1ULL << 61U, test_bravo = 1ULL << 62U,