From c10d5722f7321e413cb9c852f191b8765e7262b2 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Fri, 20 Dec 2024 17:25:58 -0500 Subject: [PATCH] `storage`: compaction fuzz test enhancements --- src/v/storage/tests/compaction_fuzz_test.cc | 71 ++++++++++++++++++--- 1 file changed, 62 insertions(+), 9 deletions(-) diff --git a/src/v/storage/tests/compaction_fuzz_test.cc b/src/v/storage/tests/compaction_fuzz_test.cc index e82194c374f6..3c8e429e2bc0 100644 --- a/src/v/storage/tests/compaction_fuzz_test.cc +++ b/src/v/storage/tests/compaction_fuzz_test.cc @@ -9,6 +9,7 @@ #include "base/vlog.h" #include "container/fragmented_vector.h" +#include "model/namespace.h" #include "model/record_batch_types.h" #include "model/timeout_clock.h" #include "random/generators.h" @@ -104,7 +105,14 @@ struct ot_state { /// batches! struct ot_state_consumer { ss::future operator()(model::record_batch rb) { - if (rb.header().type != model::record_batch_type::raft_data) { + static const auto translation_batches + = model::offset_translator_batch_types(); + if ( + std::find( + translation_batches.begin(), + translation_batches.end(), + rb.header().type) + != translation_batches.end()) { // save information about the non-data batch st->gap_offset.push_back(rb.base_offset()); st->gap_length.push_back(rb.record_count()); @@ -122,11 +130,28 @@ struct ot_state_consumer { /// by the set of segment base offset values. ss::future arrange_and_compact( const fragmented_vector& batches, - std::deque arrangement) { + std::deque arrangement, + bool simulate_internal_topic_compaction = false) { std::sort(arrangement.begin(), arrangement.end()); - storage::disk_log_builder b1; + storage::log_config cfg = storage::log_builder_config(); + auto offset_translator_types = model::offset_translator_batch_types(); + auto raft_group_id = raft::group_id{0}; + storage::disk_log_builder b1(cfg, offset_translator_types, raft_group_id); + + auto ns = simulate_internal_topic_compaction + ? model::kafka_internal_namespace + : model::kafka_namespace; + model::ntp log_ntp( + ns, + model::topic_partition( + model::topic(random_generators::gen_alphanum_string(8)), + model::partition_id{0})); std::exception_ptr error = nullptr; - co_await b1.start(); + co_await b1.start(log_ntp); + + // Must initialize translator state. + co_await b1.get_disk_log_impl().start(std::nullopt); + try { for (const auto& b : batches) { co_await b1.add_batch(b.copy()); @@ -138,11 +163,13 @@ ss::future arrange_and_compact( } } ss::abort_source as; - co_await b1.apply_compaction(storage::compaction_config( + auto compact_cfg = storage::compaction_config( batches.back().last_offset(), std::nullopt, ss::default_priority_class(), - as)); + as); + std::ignore = co_await b1.apply_sliding_window_compaction(compact_cfg); + co_await b1.apply_adjacent_merge_compaction(compact_cfg); } catch (...) { error = std::current_exception(); } @@ -183,13 +210,39 @@ std::deque generate_random_arrangement( } SEASTAR_THREAD_TEST_CASE(test_compaction_with_different_segment_arrangements) { - auto batches = generate_random_record_batches(1000, 10); +#ifdef NDEBUG + static constexpr auto num_batches = 1000; + std::vector num_segments = {10, 100, 1000}; +#else + static constexpr auto num_batches = 10; + std::vector num_segments = {10}; +#endif + auto batches = generate_random_record_batches(num_batches, 10); auto expected_ot - = arrange_and_compact(batches, std::deque{}).get(); + = arrange_and_compact(batches, std::deque{}, false).get(); + for (auto num : num_segments) { + auto arrangement = generate_random_arrangement(batches, num); + auto actual_ot = arrange_and_compact(batches, arrangement, false).get(); + BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset); + BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length); + } +} + +SEASTAR_THREAD_TEST_CASE( + test_compaction_with_different_segment_arrangements_simulate_internal_topic) { +#ifdef NDEBUG + static constexpr auto num_batches = 1000; std::vector num_segments = {10, 100, 1000}; +#else + static constexpr auto num_batches = 10; + std::vector num_segments = {10}; +#endif + auto batches = generate_random_record_batches(num_batches, 10); + auto expected_ot + = arrange_and_compact(batches, std::deque{}, true).get(); for (auto num : num_segments) { auto arrangement = generate_random_arrangement(batches, num); - auto actual_ot = arrange_and_compact(batches, arrangement).get(); + auto actual_ot = arrange_and_compact(batches, arrangement, true).get(); BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset); BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length); }