Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: Use read-write fence in the ntp_archiver #24574

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
10 changes: 8 additions & 2 deletions src/v/cloud_storage/tests/cloud_storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ TEST_P(EndToEndFixture, TestProduceConsumeFromCloudWithSpillover) {
log->force_roll(ss::default_priority_class()).get();

ASSERT_TRUE(archiver.sync_for_tests().get());
archiver.upload_next_candidates().get();
archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();
}
ASSERT_EQ(
cloud_storage::upload_result::success,
Expand Down Expand Up @@ -737,7 +740,10 @@ TEST_F(CloudStorageManualMultiNodeTestBase, ReclaimableReportedInHealthReport) {
// drive the uploading
auto& archiver = prt_l->archiver()->get();
archiver.sync_for_tests().get();
archiver.upload_next_candidates().get();
archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();

// not for synchronization... just to give the system time to propogate
// all the state changes are are happening so that this overall loop
Expand Down
8 changes: 6 additions & 2 deletions src/v/cloud_storage/tests/delete_records_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ FIXTURE_TEST(
while (produced_kafka_base_offset > stm_manifest.get_next_kafka_offset()) {
BOOST_REQUIRE(archiver->sync_for_tests().get());
BOOST_REQUIRE_EQUAL(
archiver->upload_next_candidates()
archiver
->upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get()
.non_compacted_upload_result.num_failed,
0);
Expand Down Expand Up @@ -497,7 +499,9 @@ FIXTURE_TEST(test_delete_from_stm_truncation, delete_records_e2e_fixture) {
// Upload more and truncate the manifest past the override.
BOOST_REQUIRE(archiver->sync_for_tests().get());
BOOST_REQUIRE_EQUAL(
archiver->upload_next_candidates()
archiver
->upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get()
.non_compacted_upload_result.num_failed,
0);
Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_storage/tests/produce_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class remote_segment_generator {
co_return -1;
}
if (
(co_await archiver.upload_next_candidates())
(co_await archiver.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true}))
.non_compacted_upload_result.num_failed
> 0) {
co_return -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) {

// Sync archiver, upload candidates (if needed) and upload manifest.
archiver.sync_for_tests().get();
std::ignore = archiver.upload_next_candidates().get();
std::ignore = archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();
archiver.upload_topic_manifest().get();
}

Expand Down Expand Up @@ -188,7 +191,10 @@ TEST_F(TopicRecoveryFixture, TestTopicNamespaceOverrideRecovery) {
.get());

archiver.sync_for_tests().get();
std::ignore = archiver.upload_next_candidates().get();
std::ignore = archiver
.upload_next_candidates(
archival::archival_stm_fence{.unsafe_add = true})
.get();

// Check requests with the same predicate at end of scope, just to be
// explicit about bad requests.
Expand Down
29 changes: 16 additions & 13 deletions src/v/cluster/archival/adjacent_segment_merger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,29 +213,32 @@ adjacent_segment_merger::run(run_quota_t quota) {
const cloud_storage::partition_manifest& manifest) {
return scan_manifest(local_start_offset, manifest);
};
auto [archiver_units, upl] = co_await _archiver.find_reupload_candidate(
scanner);
if (!upl.has_value()) {
auto find_res = co_await _archiver.find_reupload_candidate(scanner);
if (!find_res.locks.has_value()) {
vlog(_ctxlog.debug, "No more upload candidates");
co_return result;
}
vassert(archiver_units.has_value(), "Must take archiver units");
auto next = model::next_offset(upl->candidate.final_offset);
vassert(find_res.units.has_value(), "Must take archiver units");
auto next = model::next_offset(find_res.locks->candidate.final_offset);
vlog(
_ctxlog.debug,
"Going to upload segment {}, num source segments {}, last offset {}",
upl->candidate.exposed_name,
upl->candidate.sources.size(),
upl->candidate.final_offset);
for (const auto& src : upl->candidate.sources) {
"Going to upload segment {}, num source segments {}, last offset {}, "
"read-write-fence value: {}",
find_res.locks->candidate.exposed_name,
find_res.locks->candidate.sources.size(),
find_res.locks->candidate.final_offset,
find_res.read_write_fence.read_write_fence);
for (const auto& src : find_res.locks->candidate.sources) {
vlog(
_ctxlog.debug,
"Local log segment {} found, size {}",
src->filename(),
src->size_bytes());
}

auto uploaded = co_await _archiver.upload(
std::move(*archiver_units), std::move(*upl), std::ref(_root_rtc));
std::move(find_res), std::ref(_root_rtc));

if (uploaded) {
_last = next;
result.status = run_status::ok;
Expand All @@ -246,14 +249,14 @@ adjacent_segment_merger::run(run_quota_t quota) {
result.remaining = result.remaining - run_quota_t{1};
vlog(
_ctxlog.debug,
"Successfuly uploaded segment, new last offfset is {}",
"Successfully uploaded segment, new last offset is {}",
_last);
} else {
// Upload failed
result.status = run_status::failed;
vlog(
_ctxlog.debug,
"Failed to upload segment, last offfset is {}",
"Failed to upload segment, last offset is {}",
_last);
}
}
Expand Down
50 changes: 23 additions & 27 deletions src/v/cluster/archival/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,11 @@ ss::future<std::error_code> archival_metadata_stm::do_add_segments(
}

ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) {
/*TODO: remove*/ vlog(
_log.debug,
"NEEDLE do_apply applying batch {} ({})",
b.base_offset(),
b.header().type);
if (
b.header().type != model::record_batch_type::archival_metadata
&& b.header().type != model::record_batch_type::prefix_truncate) {
Expand Down Expand Up @@ -1071,14 +1076,24 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) {
}
});
} else {
auto on_exit = ss::defer(
[this] { maybe_notify_waiter(errc::success); });
/*TODO: remove*/ vlog(_log.debug, "NEEDLE do_apply called");
auto on_exit = ss::defer([this] {
maybe_notify_waiter(errc::success);
/*TODO: remove*/ vlog(_log.debug, "NEEDLE do_apply completed");
});
try {
b.for_each_record([this, base_offset = b.base_offset()](
model::record&& r) {
auto key = serde::from_iobuf<cmd_key>(r.release_key());

/*TODO: remove*/ vlog(
_log.debug, "NEEDLE do_apply command key {}", key);

if (key != read_write_fence_cmd::key) {
/*TODO: remove*/ vlog(
_log.debug,
"NEEDLE do_apply advance applied offset: {}",
base_offset + model::offset{r.offset_delta()});
_manifest->advance_applied_offset(
base_offset + model::offset{r.offset_delta()});
}
Expand Down Expand Up @@ -1152,6 +1167,10 @@ ss::future<> archival_metadata_stm::do_apply(const model::record_batch& b) {
if (apply_read_write_fence(
serde::from_iobuf<read_write_fence_cmd>(
r.release_value()))) {
/*TODO: remove*/ vlog(
_log.debug,
"NEEDLE do_apply rw-fence concurrency violation {}",
r.release_value());
// This means that there is a concurrency violation. The
// fence was created before some other command was
// applied. We can't apply the commands from this batch.
Expand Down Expand Up @@ -1335,7 +1354,8 @@ archival_metadata_stm::take_local_snapshot(ssx::semaphore_units apply_units) {
.last_partition_scrub = _manifest->last_partition_scrub(),
.last_scrubbed_offset = _manifest->last_scrubbed_offset(),
.detected_anomalies = _manifest->detected_anomalies(),
.highest_producer_id = _manifest->highest_producer_id()});
.highest_producer_id = _manifest->highest_producer_id(),
.applied_offset = _manifest->get_applied_offset()});
auto snapshot_offset = last_applied_offset();
apply_units.return_all();

Expand Down Expand Up @@ -1403,30 +1423,6 @@ void archival_metadata_stm::maybe_notify_waiter(std::exception_ptr e) noexcept {

void archival_metadata_stm::apply_add_segment(const segment& segment) {
auto meta = segment.meta;
bool disable_safe_add
= config::shard_local_cfg()
.cloud_storage_disable_metadata_consistency_checks.value();
if (
!disable_safe_add && segment.is_validated == segment_validated::yes
&& !_manifest->safe_segment_meta_to_add(meta)) {
// We're only validating segment metadata records if they're validated.
// It goes like this
// - npt_archiver_service validates segment_meta instances before
// replication
// - replicated add_segment commands have 'is_validated' field set to
// 'yes'
// - old records in the log have 'is_validated' field set to 'no'
// - the 'apply_add_segment' will only validate new commands and add old
// ones unconditionally
auto last = _manifest->last_segment();
vlog(
_logger.error,
"Can't add segment: {}, previous segment: {}",
meta,
last);
maybe_notify_waiter(errc::inconsistent_stm_update);
return;
}
if (meta.ntp_revision == model::initial_revision_id{}) {
// metadata serialized by old versions of redpanda doesn't have the
// ntp_revision field.
Expand Down
Loading
Loading