Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archival: Disable cross-term compaction #24566

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/v/cluster/archival/segment_reupload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "segment_reupload.h"

#include "base/vlog.h"
#include "cloud_storage/partition_manifest.h"
#include "config/configuration.h"
#include "logger.h"
Expand Down Expand Up @@ -202,6 +203,8 @@ void segment_collector::do_collect(segment_collector_mode mode) {

align_end_offset_to_manifest(
_target_end_inclusive.value_or(current_segment_end));

align_with_term_boundary();
}

model::offset segment_collector::find_replacement_boundary() const {
Expand Down Expand Up @@ -273,6 +276,42 @@ void segment_collector::align_end_offset_to_manifest(
}
}

void segment_collector::align_with_term_boundary() {
auto opt_term = _log.get_term(_begin_inclusive);
if (!opt_term.has_value()) {
vlog(
archival_log.warn,
"Can't find term for offset {}, the log could be truncated",
_begin_inclusive);
throw std::runtime_error(fmt_with_ctx(
fmt::format, "Can't find term for offset {}", _begin_inclusive));
}
auto opt_max_offset = _log.get_term_last_offset(opt_term.value());
if (!opt_max_offset.has_value()) {
vlog(
archival_log.warn,
"Can't find last offset for term {}",
opt_term.value());
throw std::runtime_error(fmt_with_ctx(
fmt::format, "Can't find last offset for term {}", opt_term.value()));
}
vlog(
archival_log.debug,
"align_with_term_boundary, begin={}, end={}, term={}, "
"last-term-offset={}",
_begin_inclusive,
_end_inclusive,
opt_term.value(),
opt_max_offset.value());
// Invariant: 'max_offset' can't be in the middle of the segment
// because we're rolling a segment when new term starts.
auto max_offset = opt_max_offset.value();
// NOTE: if the segment lookup overshoots we can return max_offset
// because it's guaranteed to be a committed offset of some segment
// in the manifest.
_end_inclusive = std::min(max_offset, _end_inclusive);
}

segment_collector::lookup_result segment_collector::find_next_segment(
model::offset start_offset, segment_collector_mode mode) {
// 'start_offset' should always be above the start offset of the local log
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/archival/segment_reupload.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ class segment_collector {
/// overlap.
void align_end_offset_to_manifest(model::offset compacted_segment_end);

/// Make sure that reupload begins and ends in the same term.
void align_with_term_boundary();

/// Finds the offset which the collection needs to progress upto in order to
/// replace at least one manifest segment. The collection is valid if it
/// reaches the replacement boundary.
Expand Down
107 changes: 89 additions & 18 deletions src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
std::vector<segment_desc> segments = {
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
{manifest_ntp, model::offset(1000), model::term_id(1), 10, 2},
};

initialize(segments);
Expand All @@ -385,7 +385,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {

auto manifest = verify_manifest_request(*part);
verify_segment_request("0-1-v1.log", manifest);
verify_segment_request("1000-4-v1.log", manifest);
verify_segment_request("1000-1-v1.log", manifest);

BOOST_REQUIRE(part->archival_meta_stm());
const cloud_storage::partition_manifest& stm_manifest
Expand Down Expand Up @@ -414,7 +414,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});

verify_concat_segment_request(
{"0-1-v1.log", "1000-4-v1.log"}, part->archival_meta_stm()->manifest());
{"0-1-v1.log", "1000-1-v1.log"}, part->archival_meta_stm()->manifest());
}

FIXTURE_TEST(
Expand Down Expand Up @@ -746,10 +746,10 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
// NOTE: different terms so compaction leaves one segment each.
std::vector<segment_desc> segments = {
{manifest_ntp, model::offset(0), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(10), model::term_id(2), 10, 2},
{manifest_ntp, model::offset(20), model::term_id(3), 10, 2},
{manifest_ntp, model::offset(30), model::term_id(4), 10, 2},
{manifest_ntp, model::offset(40), model::term_id(5), 10, 2},
{manifest_ntp, model::offset(10), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(20), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(30), model::term_id(1), 10, 2},
{manifest_ntp, model::offset(40), model::term_id(1), 10, 2},
};

initialize(segments);
Expand All @@ -768,9 +768,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
get_targets().find(manifest_url)->second.content);

verify_segment_request("0-1-v1.log", manifest);
verify_segment_request("10-2-v1.log", manifest);
verify_segment_request("20-3-v1.log", manifest);
verify_segment_request("30-4-v1.log", manifest);
verify_segment_request("10-1-v1.log", manifest);
verify_segment_request("20-1-v1.log", manifest);
verify_segment_request("30-1-v1.log", manifest);

BOOST_REQUIRE(part->archival_meta_stm());
const cloud_storage::partition_manifest& stm_manifest
Expand All @@ -790,7 +790,7 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
create_segment(
{manifest_ntp,
last_segment->offsets().get_committed_offset() + model::offset{1},
model::term_id{6},
model::term_id{2},
10});
}

Expand All @@ -805,13 +805,13 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
BOOST_REQUIRE_EQUAL(get_requests().size(), 9);

verify_segment_request(
"40-5-v1.log", part->archival_meta_stm()->manifest());
"40-1-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"50-6-v1.log", part->archival_meta_stm()->manifest());
"50-2-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"65-6-v1.log", part->archival_meta_stm()->manifest());
"65-2-v1.log", part->archival_meta_stm()->manifest());
verify_segment_request(
"85-6-v1.log", part->archival_meta_stm()->manifest());
"85-2-v1.log", part->archival_meta_stm()->manifest());

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
Expand All @@ -828,9 +828,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
verify_concat_segment_request(
{
"0-1-v1.log",
"10-2-v1.log",
"20-3-v1.log",
"30-4-v1.log",
"10-1-v1.log",
"20-1-v1.log",
"30-1-v1.log",
},
part->archival_meta_stm()->manifest());

Expand All @@ -841,3 +841,74 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
replaced = stm_manifest.replaced_segments();
BOOST_REQUIRE_EQUAL(replaced.size(), 4);
}

FIXTURE_TEST(test_upload_compacted_segments_cross_term, reupload_fixture) {
std::vector<segment_desc> segments = {
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
};

initialize(segments);
auto action = ss::defer([this] { archiver->stop().get(); });

auto part = app.partition_manager.local().get(manifest_ntp);
listen();

// Upload two non compacted segments, no segment is compacted yet.
archival::ntp_archiver::batch_result expected{{2, 0, 0}, {0, 0, 0}};
upload_and_verify(archiver.value(), expected);

// Two segments, two indices, one manifest
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);

auto manifest = verify_manifest_request(*part);
verify_segment_request("0-1-v1.log", manifest);
verify_segment_request("1000-4-v1.log", manifest);

BOOST_REQUIRE(part->archival_meta_stm());
const cloud_storage::partition_manifest& stm_manifest
= part->archival_meta_stm()->manifest();
verify_stm_manifest(stm_manifest, segments);

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});

// Mark both segments compacted, and re-upload. Both segments are
// re-uploaded.
reset_http_call_state();

vlog(test_log.info, "Waiting for segments to self-compact");
auto seg = self_compact_next_segment();
vlog(test_log.info, "Self-compaction completed");

expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {2, 0, 0}};
upload_and_verify(archiver.value(), expected);
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);

BOOST_REQUIRE_EQUAL(
stm_manifest.get_last_uploaded_compacted_offset(),
seg->offsets().get_committed_offset());

auto replaced = stm_manifest.replaced_segments();
BOOST_REQUIRE_EQUAL(replaced.size(), 2);
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});

// We can't reupload x-term so we should end up with two
// compacted uploads.

BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(0))->base_offset, model::offset(0));
BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(0))->committed_offset, model::offset(999));
BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(0))->segment_term, model::term_id(1));

BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(1000))->base_offset, model::offset(1000));
BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(1000))->committed_offset,
model::offset(1009));
BOOST_REQUIRE_EQUAL(
stm_manifest.get(model::offset(1000))->segment_term, model::term_id(4));
}
Loading