Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: compaction alignment test #24636

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/v/storage/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -853,3 +853,25 @@ redpanda_cc_gtest(
"@seastar//:testing",
],
)

redpanda_cc_btest(
name = "compaction_fuzz_test",
timeout = "short",
srcs = [
"compaction_fuzz_test.cc",
],
deps = [
":disk_log_builder",
"//src/v/base",
"//src/v/container:fragmented_vector",
"//src/v/model",
"//src/v/random:generators",
"//src/v/storage",
"//src/v/storage:record_batch_builder",
"//src/v/test_utils:seastar_boost",
"@abseil-cpp//absl/container:btree",
"@boost//:test",
"@seastar",
"@seastar//:testing",
],
)
1 change: 1 addition & 0 deletions src/v/storage/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rp_test(
file_sanitizer_test.cc
compaction_reducer_test.cc
batch_consumer_utils_test.cc
compaction_fuzz_test.cc
LIBRARIES v::seastar_testing_main v::storage_test_utils v::model_test_utils
LABELS storage
ARGS "-- -c 1"
Expand Down
249 changes: 249 additions & 0 deletions src/v/storage/tests/compaction_fuzz_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "base/vlog.h"
#include "container/fragmented_vector.h"
#include "model/namespace.h"
#include "model/record_batch_types.h"
#include "model/timeout_clock.h"
#include "random/generators.h"
#include "storage/record_batch_builder.h"
#include "storage/tests/utils/disk_log_builder.h"
#include "storage/types.h"

#include <seastar/core/io_priority_class.hh>
#include <seastar/core/sleep.hh>
#include <seastar/testing/thread_test_case.hh>

#include <absl/container/btree_map.h>
#include <boost/test/tools/old/interface.hpp>
#include <boost/test/unit_test.hpp>

#include <exception>

using namespace std::chrono_literals;

namespace {
ss::logger cmp_testlog("cmp-fuzz");
} // anonymous namespace

static model::record_batch make_random_batch(
model::offset offset,
bool empty,
model::record_batch_type type,
std::vector<std::optional<ss::sstring>> keys,
std::vector<std::optional<ss::sstring>> values,
int num_records) {
BOOST_REQUIRE(keys.size() == values.size());
storage::record_batch_builder builder(type, offset);
auto to_iobuf = [](std::optional<ss::sstring> x) {
std::optional<iobuf> result;
if (x.has_value()) {
iobuf buf;
buf.append(x->data(), x->size());
result = std::move(buf);
}
return result;
};
if (!empty) {
for (int i = 0; i < num_records; i++) {
auto key = random_generators::random_choice(keys);
auto val = random_generators::random_choice(values);
builder.add_raw_kv(to_iobuf(key), to_iobuf(val));
}
}
return std::move(builder).build();
}
Comment on lines +36 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this doing something specific to the test? pretty sure we have at least a few versions of random batch creation utilities in the tree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like we have another random batch builder with the same signature as the one written here.

It's possible we could use some existing batch building utilities, but Evgeny probably added this for ease of use in this test. We don't assert on values, but the key generation is obviously important for compaction here.


static fragmented_vector<model::record_batch>
generate_random_record_batches(int num, int cardinality) {
fragmented_vector<model::record_batch> result;
std::vector<std::optional<ss::sstring>> keys;
std::vector<std::optional<ss::sstring>> values;
std::vector<model::record_batch_type> types{
model::record_batch_type::raft_configuration,
model::record_batch_type::raft_data,
model::record_batch_type::archival_metadata,
};
for (int i = 0; i < cardinality; i++) {
if (i == 0) {
keys.emplace_back(std::nullopt);
} else {
keys.emplace_back(
random_generators::gen_alphanum_string(20, false));
}
values.emplace_back(random_generators::gen_alphanum_string(20, false));
}
// Generate actual batches
model::offset current{0};
for (int i = 0; i < num; i++) {
result.emplace_back(make_random_batch(
current,
false,
random_generators::random_choice(types),
keys,
values,
random_generators::get_int(1, 10)));
current = model::next_offset(result.back().last_offset());
}
return result;
}

/// Offset translator state observed at some point in time
struct ot_state {
std::deque<model::offset> gap_offset;
std::deque<int64_t> gap_length;
};

/// Consumer that builds the map of all non-data
/// batches!
struct ot_state_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch rb) {
static const auto translation_batches
= model::offset_translator_batch_types();
if (
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A different condition than if(rb.header().type != model::record_batch_type::raft_data).

Otherwise, compaction placeholder batches may find their way into the ot_state with a gap_length of 0.

std::find(
translation_batches.begin(),
translation_batches.end(),
rb.header().type)
!= translation_batches.end()) {
// save information about the non-data batch
st->gap_offset.push_back(rb.base_offset());
st->gap_length.push_back(rb.record_count());
}
co_return ss::stop_iteration::no;
}

void end_of_stream() {}

ot_state* st;
};

/// Insert data into the log and maintain particular
/// segment arrangement. The arrangement is defined
/// by the set of segment base offset values.
ss::future<ot_state> arrange_and_compact(
const fragmented_vector<model::record_batch>& batches,
std::deque<model::offset> arrangement,
bool simulate_internal_topic_compaction = false) {
std::sort(arrangement.begin(), arrangement.end());
storage::log_config cfg = storage::log_builder_config();
auto offset_translator_types = model::offset_translator_batch_types();
auto raft_group_id = raft::group_id{0};
storage::disk_log_builder b1(cfg, offset_translator_types, raft_group_id);

auto ns = simulate_internal_topic_compaction
? model::kafka_internal_namespace
: model::kafka_namespace;
model::ntp log_ntp(
ns,
model::topic_partition(
model::topic(random_generators::gen_alphanum_string(8)),
model::partition_id{0}));
std::exception_ptr error = nullptr;
co_await b1.start(log_ntp);

// Must initialize translator state.
co_await b1.get_disk_log_impl().start(std::nullopt);

try {
for (const auto& b : batches) {
co_await b1.add_batch(b.copy());
if (
!arrangement.empty() && b.base_offset() >= arrangement.front()) {
arrangement.pop_front();
co_await b1.get_disk_log_impl().force_roll(
ss::default_priority_class());
}
}
ss::abort_source as;
auto compact_cfg = storage::compaction_config(
batches.back().last_offset(),
std::nullopt,
ss::default_priority_class(),
as);
std::ignore = co_await b1.apply_sliding_window_compaction(compact_cfg);
co_await b1.apply_adjacent_merge_compaction(compact_cfg);
} catch (...) {
error = std::current_exception();
}
auto reader = co_await b1.get_disk_log_impl().make_reader(
storage::log_reader_config(
model::offset{0}, model::offset::max(), ss::default_priority_class()));
ot_state st{};
co_await std::move(reader).consume(
ot_state_consumer{.st = &st}, model::no_timeout);
co_await b1.stop();
if (error) {
vlog(
cmp_testlog.error,
"Error triggered while appending or compacting: {}",
error);
}
BOOST_REQUIRE(error == nullptr);
co_return st;
}

/// This function generates random alignment based on the set of batches
/// that will be written into the log.
std::deque<model::offset> generate_random_arrangement(
const fragmented_vector<model::record_batch>& batches, size_t num_segments) {
BOOST_REQUIRE(num_segments <= batches.size());
std::deque<model::offset> arr;
// User reservoir sample to produce num_segments
for (size_t i = 0; i < num_segments; i++) {
arr.push_back(batches[i].base_offset());
}
for (size_t i = num_segments; i < batches.size(); i++) {
auto r = random_generators::get_int<size_t>(0, i);
if (r < num_segments) {
arr[r] = batches[i].base_offset();
}
}
return arr;
}

SEASTAR_THREAD_TEST_CASE(test_compaction_with_different_segment_arrangements) {
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}, false).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement, false).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
}

SEASTAR_THREAD_TEST_CASE(
test_compaction_with_different_segment_arrangements_simulate_internal_topic) {
#ifdef NDEBUG
static constexpr auto num_batches = 1000;
std::vector<size_t> num_segments = {10, 100, 1000};
#else
static constexpr auto num_batches = 10;
std::vector<size_t> num_segments = {10};
#endif
auto batches = generate_random_record_batches(num_batches, 10);
auto expected_ot
= arrange_and_compact(batches, std::deque<model::offset>{}, true).get();
for (auto num : num_segments) {
auto arrangement = generate_random_arrangement(batches, num);
auto actual_ot = arrange_and_compact(batches, arrangement, true).get();
BOOST_REQUIRE(expected_ot.gap_offset == actual_ot.gap_offset);
BOOST_REQUIRE(expected_ot.gap_length == actual_ot.gap_length);
}
}
2 changes: 1 addition & 1 deletion src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4061,7 +4061,7 @@ FIXTURE_TEST(test_skipping_compaction_below_start_offset, log_builder_fixture) {
auto& first_seg = log.segments().front();
BOOST_REQUIRE_EQUAL(first_seg->finished_self_compaction(), false);

b.apply_compaction(cfg.compact, *new_start_offset).get();
b.apply_adjacent_merge_compaction(cfg.compact, *new_start_offset).get();

BOOST_REQUIRE_EQUAL(first_seg->finished_self_compaction(), false);

Expand Down
7 changes: 6 additions & 1 deletion src/v/storage/tests/utils/disk_log_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,16 @@ disk_log_builder::apply_retention(gc_config cfg) {
return get_disk_log_impl().do_gc(cfg);
}

ss::future<> disk_log_builder::apply_compaction(
ss::future<> disk_log_builder::apply_adjacent_merge_compaction(
compaction_config cfg, std::optional<model::offset> new_start_offset) {
return get_disk_log_impl().adjacent_merge_compact(cfg, new_start_offset);
}

ss::future<bool> disk_log_builder::apply_sliding_window_compaction(
compaction_config cfg, std::optional<model::offset> new_start_offset) {
return get_disk_log_impl().sliding_window_compact(cfg, new_start_offset);
WillemKauf marked this conversation as resolved.
Show resolved Hide resolved
}

ss::future<bool>
disk_log_builder::update_start_offset(model::offset start_offset) {
return get_disk_log_impl().update_start_offset(start_offset);
Expand Down
5 changes: 4 additions & 1 deletion src/v/storage/tests/utils/disk_log_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ class disk_log_builder {
model::timestamp collection_upper_bound,
std::optional<size_t> max_partition_retention_size);
ss::future<std::optional<model::offset>> apply_retention(gc_config cfg);
ss::future<> apply_compaction(
ss::future<> apply_adjacent_merge_compaction(
compaction_config cfg,
std::optional<model::offset> new_start_offset = std::nullopt);
ss::future<bool> apply_sliding_window_compaction(
compaction_config cfg,
std::optional<model::offset> new_start_offset = std::nullopt);
ss::future<bool> update_start_offset(model::offset start_offset);
Expand Down
Loading