Skip to content

Commit

Permalink
archival: Use read-write fence in the ntp_archiver
Browse files Browse the repository at this point in the history
The fence is a value which is produced by recording the current offset
of the last applied archival STM command. The fence is added to the
archival STM config batch. If no commands were applied to the STM the
offset of the last applied command will be the same and the
configuration batch will be applied. Otherwise, if some command was
applied after the record batch was constructed the batch will not be
applied.

This is essentially a concurrency control mechamism. The commands are
already present in the STM but not used as for now. The existing
metadata validation mechanism is disabled. We're only checking that the
actual segment data matches expectation.
  • Loading branch information
Lazin committed Dec 17, 2024
1 parent 79350ec commit 63eb5f5
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 148 deletions.
10 changes: 8 additions & 2 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloudWithSpillover) {
log->force_roll(ss::default_priority_class()).get();

ASSERT_TRUE(archiver.sync_for_tests().get());
archiver.upload_next_candidates().get();
archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();
}
ASSERT_EQ(
cloud_storage::upload_result::success,
Expand Down Expand Up @@ -737,7 +740,10 @@ TEST_F(CloudStorageManualMultiNodeTestBase, ReclaimableReportedInHealthReport) {
// drive the uploading
auto& archiver = prt_l->archiver()->get();
archiver.sync_for_tests().get();
archiver.upload_next_candidates().get();
archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();

// not for synchronization... just to give the system time to propogate
// all the state changes are are happening so that this overall loop
Expand Down
8 changes: 6 additions & 2 deletions src/v/cloud_storage/tests/delete_records_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ FIXTURE_TEST(
while (produced_kafka_base_offset > stm_manifest.get_next_kafka_offset()) {
BOOST_REQUIRE(archiver->sync_for_tests().get());
BOOST_REQUIRE_EQUAL(
archiver->upload_next_candidates()
archiver
->upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get()
.non_compacted_upload_result.num_failed,
0);
Expand Down Expand Up @@ -497,7 +499,9 @@ FIXTURE_TEST(test_delete_from_stm_truncation, delete_records_e2e_fixture) {
// Upload more and truncate the manifest past the override.
BOOST_REQUIRE(archiver->sync_for_tests().get());
BOOST_REQUIRE_EQUAL(
archiver->upload_next_candidates()
archiver
->upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get()
.non_compacted_upload_result.num_failed,
0);
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/produce_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class remote_segment_generator {
co_return -1;
}
if (
(co_await archiver.upload_next_candidates())
(co_await archiver.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true}))
.non_compacted_upload_result.num_failed
> 0) {
co_return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) {

// Sync archiver, upload candidates (if needed) and upload manifest.
archiver.sync_for_tests().get();
std::ignore = archiver.upload_next_candidates().get();
std::ignore = archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();
archiver.upload_topic_manifest().get();
}

Expand Down Expand Up @@ -188,7 +191,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) {
.get());

archiver.sync_for_tests().get();
std::ignore = archiver.upload_next_candidates().get();
std::ignore = archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();

// Check requests with the same predicate at end of scope, just to be
// explicit about bad requests.
Expand Down
73 changes: 42 additions & 31 deletions src/v/cluster/archival/ntp_archiver_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,8 +760,19 @@ ss::future<> ntp_archiver::upload_until_term_change_legacy() {
continue;
}

// This is the offset of the last applied command. It is used
// as a fence to implement optimistic concurrency control.
auto fence
= _parent.archival_meta_stm()->manifest().get_applied_offset();
vlog(
archival_log.debug,
"fence value is: {}, in-sync offset: {}",
fence,
_parent.archival_meta_stm()->get_insync_offset());

auto [non_compacted_upload_result, compacted_upload_result]
= co_await upload_next_candidates();
= co_await upload_next_candidates(
archival_stm_fence{.read_write_fence = fence});
if (non_compacted_upload_result.num_failed != 0) {
// The logic in class `remote` already does retries: if we get here,
// it means the upload failed after several retries, justifying
Expand Down Expand Up @@ -2087,6 +2098,7 @@ ntp_archiver::schedule_uploads(std::vector<upload_context> loop_contexts) {
}

ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
archival_stm_fence fence,
std::vector<scheduled_upload> scheduled,
segment_upload_kind segment_kind,
bool inline_manifest) {
Expand Down Expand Up @@ -2164,30 +2176,6 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
const bool checks_disabled
= config::shard_local_cfg()
.cloud_storage_disable_upload_consistency_checks.value();
if (!checks_disabled) {
std::vector<cloud_storage::segment_meta> meta;
for (size_t i = 0; i < segment_results.size(); i++) {
meta.push_back(scheduled[ixupload[i]].meta.value());
}
size_t num_accepted = manifest().safe_segment_meta_to_add(
std::move(meta));
if (num_accepted < segment_results.size()) {
vlog(
_rtclog.warn,
"Metadata inconsistency detected, {} segments uploaded but only "
"{} can be added",
segment_results.size(),
num_accepted);
_probe->gap_detected(model::offset(
static_cast<int64_t>(segment_results.size() - num_accepted)));
}
vassert(
num_accepted <= segment_results.size(),
"Accepted {} segments but only {} segments are uploaded",
num_accepted,
segment_results.size());
segment_results.resize(num_accepted);
}
for (size_t i = 0; i < segment_results.size(); i++) {
if (
segment_results[i].result()
Expand Down Expand Up @@ -2256,14 +2244,33 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
? _parent.highest_producer_id()
: model::producer_id{};

auto error = co_await _parent.archival_meta_stm()->add_segments(
auto batch_builder = _parent.archival_meta_stm()->batch_start(
deadline, _as);
if (!fence.unsafe_add) {
// The fence should be added first because it can only
// affect commands which are following it in the same record
// batch.
vlog(
archival_log.debug,
"fence value is: {}, unsafe add: {}, manifest last "
"applied offset: {}, manifest in-sync offset: {}",
fence.read_write_fence,
fence.unsafe_add,
_parent.archival_meta_stm()->manifest().get_applied_offset(),
_parent.archival_meta_stm()->get_insync_offset());
batch_builder.read_write_fence(fence.read_write_fence);
}
batch_builder.add_segments(
mdiff,
manifest_clean_offset,
highest_producer_id,
deadline,
_as,
checks_disabled ? cluster::segment_validated::no
: cluster::segment_validated::yes);
if (manifest_clean_offset.has_value()) {
batch_builder.mark_clean(manifest_clean_offset.value());
}
batch_builder.update_highest_producer_id(highest_producer_id);

auto error = co_await batch_builder.replicate();

if (
error != cluster::errc::success
&& error != cluster::errc::not_leader) {
Expand Down Expand Up @@ -2303,6 +2310,7 @@ ss::future<ntp_archiver::upload_group_result> ntp_archiver::wait_uploads(
}

ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
archival_stm_fence fence,
std::vector<ntp_archiver::scheduled_upload> scheduled) {
// Split the set of scheduled uploads into compacted and non compacted
// uploads, and then wait for them separately. They can also be waited on
Expand Down Expand Up @@ -2341,10 +2349,12 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::wait_all_scheduled_uploads(
auto [non_compacted_result, compacted_result]
= co_await ss::when_all_succeed(
wait_uploads(
fence,
std::move(non_compacted_uploads),
segment_upload_kind::non_compacted,
inline_manifest_in_non_compacted_uploads),
wait_uploads(
fence,
std::move(compacted_uploads),
segment_upload_kind::compacted,
!inline_manifest_in_non_compacted_uploads));
Expand Down Expand Up @@ -2374,6 +2384,7 @@ model::offset ntp_archiver::max_uploadable_offset_exclusive() const {
}

ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
archival_stm_fence fence,
std::optional<model::offset> unsafe_max_offset_override_exclusive) {
auto max_offset_exclusive = unsafe_max_offset_override_exclusive
? *unsafe_max_offset_override_exclusive
Expand All @@ -2389,7 +2400,7 @@ ss::future<ntp_archiver::batch_result> ntp_archiver::upload_next_candidates(
auto scheduled_uploads = co_await schedule_uploads(
max_offset_exclusive);
co_return co_await wait_all_scheduled_uploads(
std::move(scheduled_uploads));
fence, std::move(scheduled_uploads));
} catch (const ss::gate_closed_exception&) {
} catch (const ss::abort_requested_exception&) {
}
Expand Down
14 changes: 14 additions & 0 deletions src/v/cluster/archival/ntp_archiver_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ enum class wait_result { not_in_progress, complete, lost_leadership, failed };

std::ostream& operator<<(std::ostream& os, wait_result wr);

/// Fence value for the archival STM.
/// The value is used to implement optimistic
/// concurrency control.
struct archival_stm_fence {
// Offset of the last command added to the
// archival STM
model::offset read_write_fence;
// Disable fencing in tests
bool unsafe_add{false};
};

/// This class performs per-ntp archival workload. Every ntp can be
/// processed independently, without the knowledge about others. All
/// 'ntp_archiver' instances that the shard possesses are supposed to be
Expand Down Expand Up @@ -225,6 +236,7 @@ class ntp_archiver {
/// offset.
/// \return future that returns number of uploaded/failed segments
virtual ss::future<batch_result> upload_next_candidates(
archival_stm_fence fence,
std::optional<model::offset> unsafe_max_offset_override_exclusive
= std::nullopt);

Expand Down Expand Up @@ -498,13 +510,15 @@ class ntp_archiver {
///
/// Update the probe and manifest
ss::future<ntp_archiver::batch_result> wait_all_scheduled_uploads(
archival_stm_fence fence,
std::vector<ntp_archiver::scheduled_upload> scheduled);

/// Waits for scheduled segment uploads. The uploaded segments could be
/// compacted or non-compacted, the actions taken are similar in both
/// cases with the major difference being the probe updates done after
/// the upload.
ss::future<ntp_archiver::upload_group_result> wait_uploads(
archival_stm_fence fence,
std::vector<scheduled_upload> scheduled,
segment_upload_kind segment_kind,
bool inline_manifest);
Expand Down
109 changes: 3 additions & 106 deletions src/v/cluster/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ FIXTURE_TEST(
});

retry_chain_node fib(never_abort);
auto res = archiver.upload_next_candidates().get();
auto res = archiver
.upload_next_candidates(archival_stm_fence{.unsafe_add = true})
.get();

auto&& [non_compacted_result, compacted_result] = res;
BOOST_REQUIRE_EQUAL(non_compacted_result.num_succeeded, 0);
Expand Down Expand Up @@ -1861,111 +1863,6 @@ FIXTURE_TEST(test_manifest_spillover, archiver_fixture) {
test_manifest_spillover_impl(*this, 0x1000, 0x3000);
}

// NOLINTNEXTLINE
FIXTURE_TEST(test_upload_with_gap_blocked, archiver_fixture) {
std::vector<segment_desc> segments = {
{.ntp = manifest_ntp,
.base_offset = model::offset(0),
.term = model::term_id(1),
.num_records = 900},
{.ntp = manifest_ntp,
.base_offset = model::offset(1000),
.term = model::term_id(4),
.num_records = 1000},
};

init_storage_api_local(segments);
wait_for_partition_leadership(manifest_ntp);

auto part = app.partition_manager.local().get(manifest_ntp);
tests::cooperative_spin_wait_with_timeout(10s, [part]() mutable {
return part->last_stable_offset() >= model::offset(1000);
}).get();

vlog(
test_log.info,
"Partition is a leader, high-watermark: {}, partition: {}",
part->high_watermark(),
*part);

listen();

auto [arch_conf, remote_conf] = get_configurations();

auto manifest_view = ss::make_shared<cloud_storage::async_manifest_view>(
remote,
app.shadow_index_cache,
part->archival_meta_stm()->manifest(),
arch_conf->bucket_name,
path_provider);

archival::ntp_archiver archiver(
get_ntp_conf(),
arch_conf,
remote.local(),
app.shadow_index_cache.local(),
*part,
manifest_view);

auto action = ss::defer([&archiver, &manifest_view] {
archiver.stop().get();
manifest_view->stop().get();
});

retry_chain_node fib(never_abort);
auto res = upload_next_with_retries(archiver).get();

for (auto [url, req] : get_targets()) {
vlog(test_log.info, "{} {}", req.method, req.url);
}

// The archiver will upload both segments successfully but will be
// able to add to the manifest only the first one.
BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_succeeded, 2);
BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_failed, 0);
BOOST_REQUIRE_EQUAL(res.non_compacted_upload_result.num_cancelled, 0);
BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_succeeded, 0);
BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_failed, 0);
BOOST_REQUIRE_EQUAL(res.compacted_upload_result.num_cancelled, 0);

BOOST_REQUIRE_EQUAL(get_requests().size(), 5);

cloud_storage::partition_manifest manifest;
{
BOOST_REQUIRE(get_targets().count(manifest_url)); // NOLINT
auto req_opt = get_latest_request(manifest_url);
BOOST_REQUIRE(req_opt.has_value());
auto req = req_opt.value().get();
BOOST_REQUIRE_EQUAL(req.method, "PUT"); // NOLINT
manifest = load_manifest(req.content);
BOOST_REQUIRE(manifest == part->archival_meta_stm()->manifest());
}

{
segment_name segment1_name{"0-1-v1.log"};
auto segment1_url = get_segment_path(manifest, segment1_name);
auto req_opt = get_latest_request("/" + segment1_url().string());
BOOST_REQUIRE(req_opt.has_value());
auto req = req_opt.value().get();
BOOST_REQUIRE_EQUAL(req.method, "PUT"); // NOLINT
verify_segment(manifest_ntp, segment1_name, req.content);

auto index_url = get_segment_index_path(manifest, segment1_name);
auto index_req_maybe = get_latest_request("/" + index_url().string());
BOOST_REQUIRE(index_req_maybe.has_value());
auto index_req = index_req_maybe.value().get();
BOOST_REQUIRE_EQUAL(index_req.method, "PUT");
verify_index(manifest_ntp, segment1_name, manifest, index_req.content);
}

// The stm manifest should have only the first segment
BOOST_REQUIRE(part->archival_meta_stm());
const auto& stm_manifest = part->archival_meta_stm()->manifest();
BOOST_REQUIRE_EQUAL(stm_manifest.size(), 1);
BOOST_REQUIRE_EQUAL(
stm_manifest.last_segment()->base_offset, segments[0].base_offset);
}

FIXTURE_TEST(test_flush_not_leader, cloud_storage_manual_multinode_test_base) {
// start a second fixture and wait for stable setup
auto fx2 = start_second_fixture();
Expand Down
Loading

0 comments on commit 63eb5f5

Please sign in to comment.