From 63eb5f5a5b013b3c725a95bf87480a5d2c0b9c12 Mon Sep 17 00:00:00 2001 From: Evgeny Lazin <4lazin@gmail.com> Date: Sat, 14 Dec 2024 13:48:42 -0500 Subject: [PATCH] archival: Use read-write fence in the ntp_archiver The fence is a value which is produced by recording the current offset of the last applied archival STM command. The fence is added to the archival STM config batch. If no commands were applied to the STM the offset of the last applied command will be the same and the configuration batch will be applied. Otherwise, if some command was applied after the record batch was constructed the batch will not be applied. This is essentially a concurrency control mechamism. The commands are already present in the STM but not used as for now. The existing metadata validation mechanism is disabled. We're only checking that the actual segment data matches expectation. --- .../tests/cloud_storage_e2e_test.cc | 10 +- .../tests/delete_records_e2e_test.cc | 8 +- src/v/cloud_storage/tests/produce_utils.h | 3 +- .../topic_namespace_override_recovery_test.cc | 10 +- .../cluster/archival/ntp_archiver_service.cc | 73 +++++++----- src/v/cluster/archival/ntp_archiver_service.h | 14 +++ .../archival/tests/ntp_archiver_test.cc | 109 +----------------- .../cluster/archival/tests/service_fixture.cc | 9 +- .../tests/cluster_recovery_backend_test.cc | 5 +- 9 files changed, 93 insertions(+), 148 deletions(-) diff --git a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc index c18f04b672369..6fad60e16ed02 100644 --- a/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc +++ b/src/v/cloud_storage/tests/cloud_storage_e2e_test.cc @@ -247,7 +247,10 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloudWithSpillover) { log->force_roll(ss::default_priority_class()).get(); ASSERT_TRUE(archiver.sync_for_tests().get()); - archiver.upload_next_candidates().get(); + archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); } ASSERT_EQ( cloud_storage::upload_result::success, @@ -737,7 +740,10 @@ TEST_F(CloudStorageManualMultiNodeTestBase, ReclaimableReportedInHealthReport) { // drive the uploading auto& archiver = prt_l->archiver()->get(); archiver.sync_for_tests().get(); - archiver.upload_next_candidates().get(); + archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); // not for synchronization... just to give the system time to propogate // all the state changes are are happening so that this overall loop diff --git a/src/v/cloud_storage/tests/delete_records_e2e_test.cc b/src/v/cloud_storage/tests/delete_records_e2e_test.cc index cfc0db79dc970..5186fde7ac740 100644 --- a/src/v/cloud_storage/tests/delete_records_e2e_test.cc +++ b/src/v/cloud_storage/tests/delete_records_e2e_test.cc @@ -417,7 +417,9 @@ FIXTURE_TEST( while (produced_kafka_base_offset > stm_manifest.get_next_kafka_offset()) { BOOST_REQUIRE(archiver->sync_for_tests().get()); BOOST_REQUIRE_EQUAL( - archiver->upload_next_candidates() + archiver + ->upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) .get() .non_compacted_upload_result.num_failed, 0); @@ -497,7 +499,9 @@ FIXTURE_TEST(test_delete_from_stm_truncation, delete_records_e2e_fixture) { // Upload more and truncate the manifest past the override. BOOST_REQUIRE(archiver->sync_for_tests().get()); BOOST_REQUIRE_EQUAL( - archiver->upload_next_candidates() + archiver + ->upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) .get() .non_compacted_upload_result.num_failed, 0); diff --git a/src/v/cloud_storage/tests/produce_utils.h b/src/v/cloud_storage/tests/produce_utils.h index b7464153a456b..94f50366d2f55 100644 --- a/src/v/cloud_storage/tests/produce_utils.h +++ b/src/v/cloud_storage/tests/produce_utils.h @@ -90,7 +90,8 @@ class remote_segment_generator { co_return -1; } if ( - (co_await archiver.upload_next_candidates()) + (co_await archiver.upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true})) .non_compacted_upload_result.num_failed > 0) { co_return -1; diff --git a/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc b/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc index 11359fdd8c67a..33984edd6451c 100644 --- a/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc +++ b/src/v/cloud_storage/tests/topic_namespace_override_recovery_test.cc @@ -120,7 +120,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) { // Sync archiver, upload candidates (if needed) and upload manifest. archiver.sync_for_tests().get(); - std::ignore = archiver.upload_next_candidates().get(); + std::ignore = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); archiver.upload_topic_manifest().get(); } @@ -188,7 +191,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) { .get()); archiver.sync_for_tests().get(); - std::ignore = archiver.upload_next_candidates().get(); + std::ignore = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); // Check requests with the same predicate at end of scope, just to be // explicit about bad requests. diff --git a/src/v/cluster/archival/ntp_archiver_service.cc b/src/v/cluster/archival/ntp_archiver_service.cc index 3c65c0a81ca56..cc2cf3476ec4b 100644 --- a/src/v/cluster/archival/ntp_archiver_service.cc +++ b/src/v/cluster/archival/ntp_archiver_service.cc @@ -760,8 +760,19 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() { continue; } + // This is the offset of the last applied command. It is used + // as a fence to implement optimistic concurrency control. + auto fence + = _parent.archival_meta_stm()->manifest().get_applied_offset(); + vlog( + archival_log.debug, + "fence value is: {}, in-sync offset: {}", + fence, + _parent.archival_meta_stm()->get_insync_offset()); + auto [non_compacted_upload_result, compacted_upload_result] - = co_await upload_next_candidates(); + = co_await upload_next_candidates( + archival_stm_fence{.read_write_fence = fence}); if (non_compacted_upload_result.num_failed != 0) { // The logic in class `remote` already does retries: if we get here, // it means the upload failed after several retries, justifying @@ -2087,6 +2098,7 @@ ntp_archiver::schedule_uploads(std::vector loop_contexts) { } ss::future ntp_archiver::wait_uploads( + archival_stm_fence fence, std::vector scheduled, segment_upload_kind segment_kind, bool inline_manifest) { @@ -2164,30 +2176,6 @@ ss::future ntp_archiver::wait_uploads( const bool checks_disabled = config::shard_local_cfg() .cloud_storage_disable_upload_consistency_checks.value(); - if (!checks_disabled) { - std::vector meta; - for (size_t i = 0; i < segment_results.size(); i++) { - meta.push_back(scheduled[ixupload[i]].meta.value()); - } - size_t num_accepted = manifest().safe_segment_meta_to_add( - std::move(meta)); - if (num_accepted < segment_results.size()) { - vlog( - _rtclog.warn, - "Metadata inconsistency detected, {} segments uploaded but only " - "{} can be added", - segment_results.size(), - num_accepted); - _probe->gap_detected(model::offset( - static_cast(segment_results.size() - num_accepted))); - } - vassert( - num_accepted <= segment_results.size(), - "Accepted {} segments but only {} segments are uploaded", - num_accepted, - segment_results.size()); - segment_results.resize(num_accepted); - } for (size_t i = 0; i < segment_results.size(); i++) { if ( segment_results[i].result() @@ -2256,14 +2244,33 @@ ss::future ntp_archiver::wait_uploads( ? _parent.highest_producer_id() : model::producer_id{}; - auto error = co_await _parent.archival_meta_stm()->add_segments( + auto batch_builder = _parent.archival_meta_stm()->batch_start( + deadline, _as); + if (!fence.unsafe_add) { + // The fence should be added first because it can only + // affect commands which are following it in the same record + // batch. + vlog( + archival_log.debug, + "fence value is: {}, unsafe add: {}, manifest last " + "applied offset: {}, manifest in-sync offset: {}", + fence.read_write_fence, + fence.unsafe_add, + _parent.archival_meta_stm()->manifest().get_applied_offset(), + _parent.archival_meta_stm()->get_insync_offset()); + batch_builder.read_write_fence(fence.read_write_fence); + } + batch_builder.add_segments( mdiff, - manifest_clean_offset, - highest_producer_id, - deadline, - _as, checks_disabled ? cluster::segment_validated::no : cluster::segment_validated::yes); + if (manifest_clean_offset.has_value()) { + batch_builder.mark_clean(manifest_clean_offset.value()); + } + batch_builder.update_highest_producer_id(highest_producer_id); + + auto error = co_await batch_builder.replicate(); + if ( error != cluster::errc::success && error != cluster::errc::not_leader) { @@ -2303,6 +2310,7 @@ ss::future ntp_archiver::wait_uploads( } ss::future ntp_archiver::wait_all_scheduled_uploads( + archival_stm_fence fence, std::vector scheduled) { // Split the set of scheduled uploads into compacted and non compacted // uploads, and then wait for them separately. They can also be waited on @@ -2341,10 +2349,12 @@ ss::future ntp_archiver::wait_all_scheduled_uploads( auto [non_compacted_result, compacted_result] = co_await ss::when_all_succeed( wait_uploads( + fence, std::move(non_compacted_uploads), segment_upload_kind::non_compacted, inline_manifest_in_non_compacted_uploads), wait_uploads( + fence, std::move(compacted_uploads), segment_upload_kind::compacted, !inline_manifest_in_non_compacted_uploads)); @@ -2374,6 +2384,7 @@ model::offset ntp_archiver::max_uploadable_offset_exclusive() const { } ss::future ntp_archiver::upload_next_candidates( + archival_stm_fence fence, std::optional unsafe_max_offset_override_exclusive) { auto max_offset_exclusive = unsafe_max_offset_override_exclusive ? *unsafe_max_offset_override_exclusive @@ -2389,7 +2400,7 @@ ss::future ntp_archiver::upload_next_candidates( auto scheduled_uploads = co_await schedule_uploads( max_offset_exclusive); co_return co_await wait_all_scheduled_uploads( - std::move(scheduled_uploads)); + fence, std::move(scheduled_uploads)); } catch (const ss::gate_closed_exception&) { } catch (const ss::abort_requested_exception&) { } diff --git a/src/v/cluster/archival/ntp_archiver_service.h b/src/v/cluster/archival/ntp_archiver_service.h index bc1456cc1a21d..36b29ac58db5b 100644 --- a/src/v/cluster/archival/ntp_archiver_service.h +++ b/src/v/cluster/archival/ntp_archiver_service.h @@ -114,6 +114,17 @@ enum class wait_result { not_in_progress, complete, lost_leadership, failed }; std::ostream& operator<<(std::ostream& os, wait_result wr); +/// Fence value for the archival STM. +/// The value is used to implement optimistic +/// concurrency control. +struct archival_stm_fence { + // Offset of the last command added to the + // archival STM + model::offset read_write_fence; + // Disable fencing in tests + bool unsafe_add{false}; +}; + /// This class performs per-ntp archival workload. Every ntp can be /// processed independently, without the knowledge about others. All /// 'ntp_archiver' instances that the shard possesses are supposed to be @@ -225,6 +236,7 @@ class ntp_archiver { /// offset. /// \return future that returns number of uploaded/failed segments virtual ss::future upload_next_candidates( + archival_stm_fence fence, std::optional unsafe_max_offset_override_exclusive = std::nullopt); @@ -498,6 +510,7 @@ class ntp_archiver { /// /// Update the probe and manifest ss::future wait_all_scheduled_uploads( + archival_stm_fence fence, std::vector scheduled); /// Waits for scheduled segment uploads. The uploaded segments could be @@ -505,6 +518,7 @@ class ntp_archiver { /// cases with the major difference being the probe updates done after /// the upload. ss::future wait_uploads( + archival_stm_fence fence, std::vector scheduled, segment_upload_kind segment_kind, bool inline_manifest); diff --git a/src/v/cluster/archival/tests/ntp_archiver_test.cc b/src/v/cluster/archival/tests/ntp_archiver_test.cc index afe20db72a2d4..5b8dbb932ad56 100644 --- a/src/v/cluster/archival/tests/ntp_archiver_test.cc +++ b/src/v/cluster/archival/tests/ntp_archiver_test.cc @@ -404,7 +404,9 @@ FIXTURE_TEST( }); retry_chain_node fib(never_abort); - auto res = archiver.upload_next_candidates().get(); + auto res = archiver + .upload_next_candidates(archival_stm_fence{.unsafe_add = true}) + .get(); auto&& [non_compacted_result, compacted_result] = res; BOOST_REQUIRE_EQUAL(non_compacted_result.num_succeeded, 0); @@ -1861,111 +1863,6 @@ FIXTURE_TEST(test_manifest_spillover, archiver_fixture) { test_manifest_spillover_impl(*this, 0x1000, 0x3000); } -// NOLINTNEXTLINE -FIXTURE_TEST(test_upload_with_gap_blocked, archiver_fixture) { - std::vector segments = { - {.ntp = manifest_ntp, - .base_offset = model::offset(0), - .term = model::term_id(1), - .num_records = 900}, - {.ntp = manifest_ntp, - .base_offset = model::offset(1000), - .term = model::term_id(4), - .num_records = 1000}, - }; - - init_storage_api_local(segments); - wait_for_partition_leadership(manifest_ntp); - - auto part = app.partition_manager.local().get(manifest_ntp); - tests::cooperative_spin_wait_with_timeout(10s, [part]() mutable { - return part->last_stable_offset() >= model::offset(1000); - }).get(); - - vlog( - test_log.info, - "Partition is a leader, high-watermark: {}, partition: {}", - part->high_watermark(), - *part); - - listen(); - - auto [arch_conf, remote_conf] = get_configurations(); - - auto manifest_view = ss::make_shared( - remote, - app.shadow_index_cache, - part->archival_meta_stm()->manifest(), - arch_conf->bucket_name, - path_provider); - - archival::ntp_archiver archiver( - get_ntp_conf(), - arch_conf, - remote.local(), - app.shadow_index_cache.local(), - *part, - manifest_view); - - auto action = ss::defer([&archiver, &manifest_view] { - archiver.stop().get(); - manifest_view->stop().get(); - }); - - retry_chain_node fib(never_abort); - auto res = upload_next_with_retries(archiver).get(); - - for (auto [url, req] : get_targets()) { - vlog(test_log.info, "{} {}", req.method, req.url); - } - - // The archiver will upload both segments successfully but will be - // able to add to the manifest only the first one. - BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_succeeded, 2); - BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_failed, 0); - BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_cancelled, 0); - BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_succeeded, 0); - BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_failed, 0); - BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_cancelled, 0); - - BOOST_REQUIRE_EQUAL(get_requests().size(), 5); - - cloud_storage::partition_manifest manifest; - { - BOOST_REQUIRE(get_targets().count(manifest_url)); // NOLINT - auto req_opt = get_latest_request(manifest_url); - BOOST_REQUIRE(req_opt.has_value()); - auto req = req_opt.value().get(); - BOOST_REQUIRE_EQUAL(req.method, "PUT"); // NOLINT - manifest = load_manifest(req.content); - BOOST_REQUIRE(manifest == part->archival_meta_stm()->manifest()); - } - - { - segment_name segment1_name{"0-1-v1.log"}; - auto segment1_url = get_segment_path(manifest, segment1_name); - auto req_opt = get_latest_request("/" + segment1_url().string()); - BOOST_REQUIRE(req_opt.has_value()); - auto req = req_opt.value().get(); - BOOST_REQUIRE_EQUAL(req.method, "PUT"); // NOLINT - verify_segment(manifest_ntp, segment1_name, req.content); - - auto index_url = get_segment_index_path(manifest, segment1_name); - auto index_req_maybe = get_latest_request("/" + index_url().string()); - BOOST_REQUIRE(index_req_maybe.has_value()); - auto index_req = index_req_maybe.value().get(); - BOOST_REQUIRE_EQUAL(index_req.method, "PUT"); - verify_index(manifest_ntp, segment1_name, manifest, index_req.content); - } - - // The stm manifest should have only the first segment - BOOST_REQUIRE(part->archival_meta_stm()); - const auto& stm_manifest = part->archival_meta_stm()->manifest(); - BOOST_REQUIRE_EQUAL(stm_manifest.size(), 1); - BOOST_REQUIRE_EQUAL( - stm_manifest.last_segment()->base_offset, segments[0].base_offset); -} - FIXTURE_TEST(test_flush_not_leader, cloud_storage_manual_multinode_test_base) { // start a second fixture and wait for stable setup auto fx2 = start_second_fixture(); diff --git a/src/v/cluster/archival/tests/service_fixture.cc b/src/v/cluster/archival/tests/service_fixture.cc index 411a99f53cf44..15bada91d9bf6 100644 --- a/src/v/cluster/archival/tests/service_fixture.cc +++ b/src/v/cluster/archival/tests/service_fixture.cc @@ -10,6 +10,7 @@ #include "cluster/archival/tests/service_fixture.h" +#include "archival/ntp_archiver_service.h" #include "base/seastarx.h" #include "bytes/iobuf.h" #include "bytes/iobuf_parser.h" @@ -557,7 +558,8 @@ archiver_fixture::do_upload_next( if (model::timeout_clock::now() > deadline) { co_return archival::ntp_archiver::batch_result{}; } - auto result = co_await archiver.upload_next_candidates(lso); + auto result = co_await archiver.upload_next_candidates( + archival_stm_fence{.unsafe_add = true}, lso); auto num_success = result.compacted_upload_result.num_succeeded + result.non_compacted_upload_result.num_succeeded; if (num_success > 0) { @@ -581,8 +583,9 @@ void archiver_fixture::upload_and_verify( tests::cooperative_spin_wait_with_timeout( 10s, [&archiver, expected, lso]() { - return archiver.upload_next_candidates(lso).then( - [expected](auto result) { return result == expected; }); + return archiver + .upload_next_candidates(archival_stm_fence{.unsafe_add = true}, lso) + .then([expected](auto result) { return result == expected; }); }) .get(); } diff --git a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc index cdb7ac135e656..b9a47d43e4436 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc +++ b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc @@ -180,7 +180,10 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { } auto& archiver = p->archiver().value().get(); archiver.sync_for_tests().get(); - auto res = archiver.upload_next_candidates().get(); + auto res = archiver + .upload_next_candidates( + archival::archival_stm_fence{.unsafe_add = true}) + .get(); ASSERT_GT(res.non_compacted_upload_result.num_succeeded, 0); archiver.upload_topic_manifest().get(); archiver.upload_manifest("test").get();