Skip to content

Commit f1bbcf6

Browse files
committed
archival: Disable cross-term compaction
Currently, the segment_collector can collect data from segments generated in different terms. The component is used to generate compacted reuploads. This commit fixes this by disabling the cross term segment collection. Signed-off-by: Evgeny Lazin <[email protected]>
1 parent 60ad2f4 commit f1bbcf6

File tree

3 files changed

+131
-18
lines changed

3 files changed

+131
-18
lines changed

src/v/cluster/archival/segment_reupload.cc

+39
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "segment_reupload.h"
1212

13+
#include "base/vlog.h"
1314
#include "cloud_storage/partition_manifest.h"
1415
#include "config/configuration.h"
1516
#include "logger.h"
@@ -202,6 +203,8 @@ void segment_collector::do_collect(segment_collector_mode mode) {
202203

203204
align_end_offset_to_manifest(
204205
_target_end_inclusive.value_or(current_segment_end));
206+
207+
align_with_term_boundary();
205208
}
206209

207210
model::offset segment_collector::find_replacement_boundary() const {
@@ -273,6 +276,42 @@ void segment_collector::align_end_offset_to_manifest(
273276
}
274277
}
275278

279+
void segment_collector::align_with_term_boundary() {
280+
auto opt_term = _log.get_term(_begin_inclusive);
281+
if (!opt_term.has_value()) {
282+
vlog(
283+
archival_log.warn,
284+
"Can't find term for offset {}, the log could be truncated",
285+
_begin_inclusive);
286+
throw std::runtime_error(fmt_with_ctx(
287+
fmt::format, "Can't find term for offset {}", _begin_inclusive));
288+
}
289+
auto opt_max_offset = _log.get_term_last_offset(opt_term.value());
290+
if (!opt_max_offset.has_value()) {
291+
vlog(
292+
archival_log.warn,
293+
"Can't find last offset for term {}",
294+
opt_term.value());
295+
throw std::runtime_error(fmt_with_ctx(
296+
fmt::format, "Can't find last offset for term {}", opt_term.value()));
297+
}
298+
vlog(
299+
archival_log.debug,
300+
"align_with_term_boundary, begin={}, end={}, term={}, "
301+
"last-term-offset={}",
302+
_begin_inclusive,
303+
_end_inclusive,
304+
opt_term.value(),
305+
opt_max_offset.value());
306+
// Invariant: 'max_offset' can't be in the middle of the segment
307+
// because we're rolling a segment when new term starts.
308+
auto max_offset = opt_max_offset.value();
309+
// NOTE: if the segment lookup overshoots we can return max_offset
310+
// because it's guaranteed to be a committed offset of some segment
311+
// in the manifest.
312+
_end_inclusive = std::min(max_offset, _end_inclusive);
313+
}
314+
276315
segment_collector::lookup_result segment_collector::find_next_segment(
277316
model::offset start_offset, segment_collector_mode mode) {
278317
// 'start_offset' should always be above the start offset of the local log

src/v/cluster/archival/segment_reupload.h

+3
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ class segment_collector {
113113
/// overlap.
114114
void align_end_offset_to_manifest(model::offset compacted_segment_end);
115115

116+
/// Make sure that reupload begins and ends in the same term.
117+
void align_with_term_boundary();
118+
116119
/// Finds the offset which the collection needs to progress upto in order to
117120
/// replace at least one manifest segment. The collection is valid if it
118121
/// reaches the replacement boundary.

src/v/cluster/archival/tests/ntp_archiver_reupload_test.cc

+89-18
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
367367
FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
368368
std::vector<segment_desc> segments = {
369369
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
370-
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
370+
{manifest_ntp, model::offset(1000), model::term_id(1), 10, 2},
371371
};
372372

373373
initialize(segments);
@@ -385,7 +385,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
385385

386386
auto manifest = verify_manifest_request(*part);
387387
verify_segment_request("0-1-v1.log", manifest);
388-
verify_segment_request("1000-4-v1.log", manifest);
388+
verify_segment_request("1000-1-v1.log", manifest);
389389

390390
BOOST_REQUIRE(part->archival_meta_stm());
391391
const cloud_storage::partition_manifest& stm_manifest
@@ -414,7 +414,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
414414
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});
415415

416416
verify_concat_segment_request(
417-
{"0-1-v1.log", "1000-4-v1.log"}, part->archival_meta_stm()->manifest());
417+
{"0-1-v1.log", "1000-1-v1.log"}, part->archival_meta_stm()->manifest());
418418
}
419419

420420
FIXTURE_TEST(
@@ -746,10 +746,10 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
746746
// NOTE: different terms so compaction leaves one segment each.
747747
std::vector<segment_desc> segments = {
748748
{manifest_ntp, model::offset(0), model::term_id(1), 10, 2},
749-
{manifest_ntp, model::offset(10), model::term_id(2), 10, 2},
750-
{manifest_ntp, model::offset(20), model::term_id(3), 10, 2},
751-
{manifest_ntp, model::offset(30), model::term_id(4), 10, 2},
752-
{manifest_ntp, model::offset(40), model::term_id(5), 10, 2},
749+
{manifest_ntp, model::offset(10), model::term_id(1), 10, 2},
750+
{manifest_ntp, model::offset(20), model::term_id(1), 10, 2},
751+
{manifest_ntp, model::offset(30), model::term_id(1), 10, 2},
752+
{manifest_ntp, model::offset(40), model::term_id(1), 10, 2},
753753
};
754754

755755
initialize(segments);
@@ -768,9 +768,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
768768
get_targets().find(manifest_url)->second.content);
769769

770770
verify_segment_request("0-1-v1.log", manifest);
771-
verify_segment_request("10-2-v1.log", manifest);
772-
verify_segment_request("20-3-v1.log", manifest);
773-
verify_segment_request("30-4-v1.log", manifest);
771+
verify_segment_request("10-1-v1.log", manifest);
772+
verify_segment_request("20-1-v1.log", manifest);
773+
verify_segment_request("30-1-v1.log", manifest);
774774

775775
BOOST_REQUIRE(part->archival_meta_stm());
776776
const cloud_storage::partition_manifest& stm_manifest
@@ -790,7 +790,7 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
790790
create_segment(
791791
{manifest_ntp,
792792
last_segment->offsets().get_committed_offset() + model::offset{1},
793-
model::term_id{6},
793+
model::term_id{2},
794794
10});
795795
}
796796

@@ -805,13 +805,13 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
805805
BOOST_REQUIRE_EQUAL(get_requests().size(), 9);
806806

807807
verify_segment_request(
808-
"40-5-v1.log", part->archival_meta_stm()->manifest());
808+
"40-1-v1.log", part->archival_meta_stm()->manifest());
809809
verify_segment_request(
810-
"50-6-v1.log", part->archival_meta_stm()->manifest());
810+
"50-2-v1.log", part->archival_meta_stm()->manifest());
811811
verify_segment_request(
812-
"65-6-v1.log", part->archival_meta_stm()->manifest());
812+
"65-2-v1.log", part->archival_meta_stm()->manifest());
813813
verify_segment_request(
814-
"85-6-v1.log", part->archival_meta_stm()->manifest());
814+
"85-2-v1.log", part->archival_meta_stm()->manifest());
815815

816816
BOOST_REQUIRE_EQUAL(
817817
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
@@ -828,9 +828,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
828828
verify_concat_segment_request(
829829
{
830830
"0-1-v1.log",
831-
"10-2-v1.log",
832-
"20-3-v1.log",
833-
"30-4-v1.log",
831+
"10-1-v1.log",
832+
"20-1-v1.log",
833+
"30-1-v1.log",
834834
},
835835
part->archival_meta_stm()->manifest());
836836

@@ -841,3 +841,74 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
841841
replaced = stm_manifest.replaced_segments();
842842
BOOST_REQUIRE_EQUAL(replaced.size(), 4);
843843
}
844+
845+
FIXTURE_TEST(test_upload_compacted_segments_cross_term, reupload_fixture) {
846+
std::vector<segment_desc> segments = {
847+
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
848+
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
849+
};
850+
851+
initialize(segments);
852+
auto action = ss::defer([this] { archiver->stop().get(); });
853+
854+
auto part = app.partition_manager.local().get(manifest_ntp);
855+
listen();
856+
857+
// Upload two non compacted segments, no segment is compacted yet.
858+
archival::ntp_archiver::batch_result expected{{2, 0, 0}, {0, 0, 0}};
859+
upload_and_verify(archiver.value(), expected);
860+
861+
// Two segments, two indices, one manifest
862+
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);
863+
864+
auto manifest = verify_manifest_request(*part);
865+
verify_segment_request("0-1-v1.log", manifest);
866+
verify_segment_request("1000-4-v1.log", manifest);
867+
868+
BOOST_REQUIRE(part->archival_meta_stm());
869+
const cloud_storage::partition_manifest& stm_manifest
870+
= part->archival_meta_stm()->manifest();
871+
verify_stm_manifest(stm_manifest, segments);
872+
873+
BOOST_REQUIRE_EQUAL(
874+
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
875+
876+
// Mark both segments compacted, and re-upload. Both segments are
877+
// re-uploaded.
878+
reset_http_call_state();
879+
880+
vlog(test_log.info, "Waiting for segments to self-compact");
881+
auto seg = self_compact_next_segment();
882+
vlog(test_log.info, "Self-compaction completed");
883+
884+
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {2, 0, 0}};
885+
upload_and_verify(archiver.value(), expected);
886+
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);
887+
888+
BOOST_REQUIRE_EQUAL(
889+
stm_manifest.get_last_uploaded_compacted_offset(),
890+
seg->offsets().get_committed_offset());
891+
892+
auto replaced = stm_manifest.replaced_segments();
893+
BOOST_REQUIRE_EQUAL(replaced.size(), 2);
894+
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});
895+
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});
896+
897+
// We can't reupload x-term so we should end up with two
898+
// compacted uploads.
899+
900+
BOOST_REQUIRE_EQUAL(
901+
stm_manifest.get(model::offset(0))->base_offset, model::offset(0));
902+
BOOST_REQUIRE_EQUAL(
903+
stm_manifest.get(model::offset(0))->committed_offset, model::offset(999));
904+
BOOST_REQUIRE_EQUAL(
905+
stm_manifest.get(model::offset(0))->segment_term, model::term_id(1));
906+
907+
BOOST_REQUIRE_EQUAL(
908+
stm_manifest.get(model::offset(1000))->base_offset, model::offset(1000));
909+
BOOST_REQUIRE_EQUAL(
910+
stm_manifest.get(model::offset(1000))->committed_offset,
911+
model::offset(1009));
912+
BOOST_REQUIRE_EQUAL(
913+
stm_manifest.get(model::offset(1000))->segment_term, model::term_id(4));
914+
}

0 commit comments

Comments
 (0)