Skip to content

Commit

Permalink
storage: compaction fuzz test enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Dec 23, 2024
1 parent a3112f3 commit c10d572
Showing 1 changed file with 62 additions and 9 deletions.
71 changes: 62 additions & 9 deletions src/v/storage/tests/compaction_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "base/vlog.h"
#include "container/fragmented_vector.h"
#include "model/namespace.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "random/generators.h"
Expand Down Expand Up @@ -104,7 +105,14 @@ struct ot_state {
/// batches!
struct ot_state_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch rb) {
if (rb.header().type != model::record_batch_type::raft_data) {
static const auto translation_batches
= model::offset_translator_batch_types();
if (
std::find(
translation_batches.begin(),
translation_batches.end(),
rb.header().type)
!= translation_batches.end()) {
// save information about the non-data batch
st->gap_offset.push_back(rb.base_offset());
st->gap_length.push_back(rb.record_count());
Expand All @@ -122,11 +130,28 @@ struct ot_state_consumer {
/// by the set of segment base offset values.
ss::future<ot_state> arrange_and_compact(
const fragmented_vector<model::record_batch>& batches,
std::deque<model::offset> arrangement) {
std::deque<model::offset> arrangement,
bool simulate_internal_topic_compaction = false) {
std::sort(arrangement.begin(), arrangement.end());
storage::disk_log_builder b1;
storage::log_config cfg = storage::log_builder_config();
auto offset_translator_types = model::offset_translator_batch_types();
auto raft_group_id = raft::group_id{0};
storage::disk_log_builder b1(cfg, offset_translator_types, raft_group_id);

auto ns = simulate_internal_topic_compaction
? model::kafka_internal_namespace
: model::kafka_namespace;
model::ntp log_ntp(
ns,
model::topic_partition(
model::topic(random_generators::gen_alphanum_string(8)),
model::partition_id{0}));
std::exception_ptr error = nullptr;
co_await b1.start();
co_await b1.start(log_ntp);

// Must initialize translator state.
co_await b1.get_disk_log_impl().start(std::nullopt);

try {
for (const auto& b : batches) {
co_await b1.add_batch(b.copy());
Expand All @@ -138,11 +163,13 @@ ss::future<ot_state> arrange_and_compact(
}
}
ss::abort_source as;
co_await b1.apply_compaction(storage::compaction_config(
auto compact_cfg = storage::compaction_config(
batches.back().last_offset(),
std::nullopt,
ss::default_priority_class(),
as));
as);
std::ignore = co_await b1.apply_sliding_window_compaction(compact_cfg);
co_await b1.apply_adjacent_merge_compaction(compact_cfg);
} catch (...) {
error = std::current_exception();
}
Expand Down Expand Up @@ -183,13 +210,39 @@ std::deque<model::offset> generate_random_arrangement(
}

SEASTAR_THREAD_TEST_CASE(test_compaction_with_different_segment_arrangements) {
auto batches = generate_random_record_batches(1000, 10);
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}).get();
= arrange_and_compact(batches, std::deque<model::offset>{}, false).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement, false).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
}

SEASTAR_THREAD_TEST_CASE(
test_compaction_with_different_segment_arrangements_simulate_internal_topic) {
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}, true).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement).get();
auto actual_ot = arrange_and_compact(batches, arrangement, true).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
Expand Down

0 comments on commit c10d572

Please sign in to comment.