From 400507050aaf95cbbf570471520bc359b0d088d9 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 18 Dec 2024 15:52:24 -0800 Subject: [PATCH] [draft] test compaction with reconfiguration --- src/v/raft/tests/raft_fixture.cc | 11 +- src/v/raft/tests/raft_reconfiguration_test.cc | 451 +++--------------- src/v/storage/log_manager.cc | 4 +- tools/cmake_test.py | 1 + 4 files changed, 69 insertions(+), 398 deletions(-) diff --git a/src/v/raft/tests/raft_fixture.cc b/src/v/raft/tests/raft_fixture.cc index c4f632ec9cab1..eb857d80e718f 100644 --- a/src/v/raft/tests/raft_fixture.cc +++ b/src/v/raft/tests/raft_fixture.cc @@ -423,16 +423,17 @@ raft_node_instance::initialise(std::vector initial_nodes) { _base_directory, std::nullopt); }, - [this] { return storage::log_config(_base_directory, 8_MiB); }, + [this] { return storage::log_config(_base_directory, 1_MiB); }, std::ref(_features)); co_await _storage.invoke_on_all(&storage::api::start); storage::ntp_config ntp_cfg(ntp(), _base_directory); + storage::ntp_config::default_overrides ov; + ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::compaction; + ov.segment_size = 1_MiB; + ntp_cfg.set_overrides(ov); auto log = co_await _storage.local().log_mgr().manage( - std::move(ntp_cfg), - test_group, - _with_offset_translation ? model::offset_translator_batch_types() - : std::vector{}); + std::move(ntp_cfg), test_group, model::offset_translator_batch_types()); _raft = ss::make_lw_shared( _id, diff --git a/src/v/raft/tests/raft_reconfiguration_test.cc b/src/v/raft/tests/raft_reconfiguration_test.cc index 9ae16589b20b3..4b0d6c04ab02c 100644 --- a/src/v/raft/tests/raft_reconfiguration_test.cc +++ b/src/v/raft/tests/raft_reconfiguration_test.cc @@ -104,23 +104,30 @@ struct reconfiguration_test auto make_random_batches() { return make_batches( - random_generators::get_int(100, 500), [](size_t b_idx) { + random_generators::get_int(10, 100), [](size_t b_idx) { /** * Use archival metadata batches to populate offset translator */ - const auto batch_type = random_generators::random_choice( - {model::record_batch_type::raft_data, - model::record_batch_type::archival_metadata}); + + auto batch_type = model::record_batch_type::raft_data; + if (random_generators::get_int(1, 10) >= 7) { + // lower probability + batch_type = model::record_batch_type::archival_metadata; + } storage::record_batch_builder builder( batch_type, model::offset(0)); - - for (int i = 0; i < random_generators::get_int(1, 10); ++i) { + int num_records + = batch_type == model::record_batch_type::archival_metadata + ? 1 + : random_generators::get_int(5, 10); + for (int i = 0; i < num_records; ++i) { auto r_size = random_generators::get_int(32, 1_KiB); + // limited key space builder.add_raw_kv( - serde::to_iobuf(fmt::format("{}-{}", b_idx, i)), + serde::to_iobuf( + fmt::format("{}", random_generators::get_int(0, 5))), bytes_to_iobuf(random_generators::get_bytes(r_size))); } - return std::move(builder).build(); }); } @@ -162,7 +169,7 @@ static void assert_offset_translator_state_is_consistent( for (auto* n : nodes) { start_offset = std::max(n->raft()->start_offset(), start_offset); } - std::vector deltas; + chunked_vector deltas; for (model::offset o : boost::irange(start_offset, dirty_offset)) { deltas.push_back(first_raft->log()->offset_delta(o)); @@ -210,69 +217,37 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { co_await create_simple_group(initial_size); // replicate batches - auto result = co_await retry_with_leader( - model::timeout_clock::now() + 30s, - [this, consistency_lvl](raft_node_instance& leader_node) { - return leader_node.raft() - ->replicate( - make_random_batches(), replicate_options(consistency_lvl)) - .then([this](::result r) { - if (!r) { - return ss::make_ready_future<::result>( - r.error()); - } - return wait_for_offset(r.value().last_offset, nodes()); - }); - }); - - // wait for leader - ASSERT_TRUE_CORO(result.has_value()); - auto leader = co_await wait_for_leader(30s); - auto& leader_node = node(leader); - model::offset start_offset = leader_node.raft()->start_offset(); - if (snapshot) { - if (consistency_lvl == consistency_level::leader_ack) { - for (auto& [_, n] : nodes()) { - co_await n->raft()->refresh_commit_index(); - } - co_await wait_for_committed_offset( - leader_node.raft()->flushed_offset(), 10s); - } - - const auto rand_offset = co_await with_leader( - 30s, [](raft_node_instance& leader_node) { - auto committed_offset = leader_node.raft()->committed_offset(); - auto start_offset = leader_node.raft()->start_offset(); - /** - * Take snapshot at offset ranging from start_offset to the middle - * of the log - */ + auto replicate_some_batches = [&]() { + return retry_with_leader( + model::timeout_clock::now() + 30s, + [this, consistency_lvl](raft_node_instance& leader_node) { + return leader_node.raft() + ->replicate( + make_random_batches(), + replicate_options(consistency_lvl)) + .then([](::result r) { + if (!r) { + return ss::make_ready_future< + ::result>(r.error()); + } + return ss::make_ready_future< + ::result>(model::offset{1}); + }); + }) + .discard_result(); + }; - return leader_node.random_batch_base_offset( - start_offset - + model::offset((committed_offset - start_offset) / 2)); - }); + auto stopped = false; + auto replicate_f = ss::do_until( + [&] { return stopped; }, + [&] { + return replicate_some_batches().then([]() { return ss::sleep(1ms); }); + }); - const auto last_included_offset = model::prev_offset(rand_offset); - start_offset = model::next_offset(last_included_offset); - co_await with_leader( - 30s, [last_included_offset](raft_node_instance& leader_node) { - return leader_node.raft()->write_snapshot( - raft::write_snapshot_cfg(last_included_offset, {})); - }); - } - std::optional learner_start_offset; - if (use_learner_start_offset) { - learner_start_offset = co_await with_leader( - 30s, [](raft_node_instance& leader_node) { - return leader_node.random_batch_base_offset( - leader_node.raft()->dirty_offset() - model::offset(1)); - }); - if (learner_start_offset != model::offset{}) { - start_offset = *learner_start_offset; - } - } + // wait to seed some data and kick off compaction in bg + co_await ss::sleep(30s); + // wait for leader auto old_node_ids = all_ids(); auto current_node_ids = old_node_ids; @@ -305,10 +280,10 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { // update group configuration auto success = co_await retry_with_leader( model::timeout_clock::now() + 30s, - [current_nodes, learner_start_offset](raft_node_instance& leader_node) { + [current_nodes](raft_node_instance& leader_node) { return leader_node.raft() ->replace_configuration( - current_nodes, model::revision_id(0), learner_start_offset) + current_nodes, model::revision_id(0), std::nullopt) .then([](std::error_code ec) { if (ec) { return ::result(ec); @@ -318,63 +293,17 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { }); ASSERT_TRUE_CORO(success); - auto isolated_nodes - = ss::make_lw_shared>(); - switch (isolated) { - case isolated_t::none: - break; - case isolated_t::old_leader: - isolated_nodes->insert(leader); - break; - case isolated_t::old_followers: - *isolated_nodes = old_node_ids; - isolated_nodes->erase(leader); - break; - case isolated_t::random: - for (auto n : all_ids()) { - if (tests::random_bool()) { - isolated_nodes->insert(n); - } - } - break; - } - - if (!isolated_nodes->empty()) { - vlog(test_log.info, "isolating nodes: {}", *isolated_nodes); - - for (const auto& [source_id, node] : nodes()) { - node->on_dispatch([=](model::node_id dest_id, raft::msg_type) { - if ( - isolated_nodes->contains(source_id) - != isolated_nodes->contains(dest_id)) { - return ss::sleep(5s); - } - return ss::now(); - }); - } - - // heal the partition 5s later - (void)ss::sleep(5s).then([isolated_nodes] { - vlog(test_log.info, "healing the network partition"); - isolated_nodes->clear(); - }); - } + stopped = true; + co_await std::move(replicate_f); - co_await with_leader( - 30s, [this, consistency_lvl](raft_node_instance& leader_node) { - // wait for committed offset to propagate - if (consistency_lvl == raft::consistency_level::quorum_ack) { - return wait_for_committed_offset( - leader_node.raft()->committed_offset(), 30s); - } else { - return wait_for_visible_offset( - leader_node.raft()->last_visible_index(), 30s); - } - }); + co_await wait_for_reconfiguration_to_finish(current_node_ids, 240s); - co_await wait_for_reconfiguration_to_finish(current_node_ids, 30s); + auto offset = co_await with_leader(30s, [](raft_node_instance& leader) { + return leader.raft()->last_visible_index(); + }); - co_await assert_logs_equal(start_offset); + auto result = co_await wait_for_offset(offset, nodes()); + ASSERT_TRUE_CORO(result.has_value()) << "replicas did not converge"; absl::flat_hash_set current_nodes_set( current_nodes.begin(), current_nodes.end()); @@ -390,10 +319,6 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { cfg_vnodes.begin(), cfg_vnodes.end())); ASSERT_FALSE_CORO(cfg.old_config().has_value()); ASSERT_TRUE_CORO(cfg.current_config().learners.empty()); - - if (learner_start_offset && added_nodes.contains(n.get_vnode())) { - ASSERT_EQ_CORO(n.raft()->start_offset(), learner_start_offset); - } } std::vector current_node_ptrs; @@ -406,267 +331,11 @@ TEST_P_CORO(reconfiguration_test, configuration_replace_test) { INSTANTIATE_TEST_SUITE_P( validate_replacing_raft_configuration, reconfiguration_test, - testing::Combine( - testing::Values(use_snapshot::yes, use_snapshot::no), - testing::Values(1, 3), // initial size - testing::Values(0, 1, 3), // to add - testing::Values(0, 1, 3), // to remove - testing::Values(use_initial_learner_offset::yes), - testing::Values( - consistency_level::quorum_ack, consistency_level::leader_ack), - testing::Values(isolated_t::none))); - -INSTANTIATE_TEST_SUITE_P( - reconfiguration_with_isolated_nodes, - reconfiguration_test, testing::Combine( testing::Values(use_snapshot::no), - testing::Values(3), // initial size - testing::Values(2), // to add - testing::Values(0, 2), // to remove - testing::Values(use_initial_learner_offset::yes), - testing::Values( - consistency_level::quorum_ack, consistency_level::leader_ack), - testing::Values( - isolated_t::old_followers, isolated_t::old_leader, isolated_t::random))); - -namespace { -ss::future wait_for_offset( - model::offset expected, - std::vector ids, - raft_fixture& fixture) { - auto start = model::timeout_clock::now(); - // wait for visible offset to propagate - while (start + 10s > model::timeout_clock::now()) { - bool aligned = std::all_of( - ids.begin(), ids.end(), [&](model::node_id id) { - auto& rni = fixture.node(id); - vlog( - test_log.info, - "node: {}, last_visible_index: {}, expected_offset: {}", - id, - rni.raft()->last_visible_index(), - expected); - return rni.raft()->last_visible_index() >= expected; - }); - - if (aligned) { - co_return errc::success; - } - co_await ss::sleep(1s); - } - co_return raft::errc::timeout; -} -} // namespace - -TEST_F_CORO(raft_fixture, test_force_reconfiguration) { - /** - * This tests verifies the consistency of logs on all the replicas after a - * round of force reconfigurations. - */ - co_await create_simple_group(5); - co_await wait_for_leader(10s); - - bool stop = false; - - auto replicate_fiber = ss::do_until( - [&stop] { return stop; }, - [this] { - ss::lw_shared_ptr raft; - for (auto& n : nodes()) { - if (n.second->raft()->is_leader()) { - raft = n.second->raft(); - break; - } - } - - if (!raft) { - return ss::sleep(100ms); - } - return raft - ->replicate( - make_batches(10, 10, 128), - replicate_options(raft::consistency_level::quorum_ack)) - .then([this](result result) { - if (result.has_error()) { - vlog( - logger().info, - "error(replicating): {}", - result.error().message()); - } - }); - }); - - std::vector base_replica_set = all_vnodes(); - size_t reconfiguration_count = 0; - model::revision_id next_rev{1}; - - auto current_replicas = base_replica_set; - - vlog(logger().info, "initial replicas: {}", current_replicas); - auto reconfigure_until_success = [&]( - model::revision_id rev, - raft::vnode to_skip) { - auto deadline = model::timeout_clock::now() + 90s; - return ss::repeat([this, rev, deadline, to_skip, ¤t_replicas] { - vassert( - model::timeout_clock::now() < deadline, - "Timeout waiting for reconfiguration"); - auto term = node(get_leader().value()).raft()->term(); - return ss::parallel_for_each( - nodes().begin(), - nodes().end(), - [¤t_replicas, to_skip, rev]( - const raft_nodes_t::value_type& pair) { - auto raft = pair.second->raft(); - if (pair.second->get_vnode() == to_skip) { - return ss::now(); - } - return raft - ->force_replace_configuration_locally( - current_replicas, {}, rev) - .discard_result(); - }) - .then([¤t_replicas, this, rev, term] { - return wait_for_leader_change( - model::timeout_clock::now() + 10s, term) - .then([this, rev, ¤t_replicas]( - model::node_id new_leader_id) { - vlog( - logger().info, - "new leader {} elected in term: {}", - new_leader_id, - nodes()[new_leader_id]->raft()->term()); - auto replica_rev - = node(new_leader_id).raft()->config().revision_id(); - if (replica_rev < rev) { - vlog( - logger().warn, - "retrying reconfiguration to {}, requested " - "revision: {}, node {} config revision: {}", - current_replicas, - rev, - new_leader_id, - replica_rev); - return ss::stop_iteration::no; - } - vlog( - logger().info, - "successfully reconfigured to {} with revision: {}", - current_replicas, - rev); - return ss::stop_iteration::yes; - }); - }); - }); - }; - auto reconfigure_all = [&, this]() { - /** - * Switch between all 5 replicas and randomly selected 3 of them - */ - if (current_replicas.size() == 5) { - std::shuffle( - base_replica_set.begin(), - base_replica_set.end(), - random_generators::internal::gen); - current_replicas = { - base_replica_set.begin(), std::next(base_replica_set.begin(), 3)}; - } else { - current_replicas = base_replica_set; - } - - vlog(logger().info, "reconfiguring group to: {}", current_replicas); - auto to_skip = random_generators::random_choice(base_replica_set); - auto revision = next_rev++; - return reconfigure_until_success(revision, to_skip); - }; - - auto reconfigure_fiber = ss::do_until( - [&] { return stop; }, - [&] { - return reconfigure_all() - .then([&]() { - reconfiguration_count++; - - if (reconfiguration_count >= 50) { - stop = true; - } - return ss::now(); - }) - .handle_exception([](const std::exception_ptr&) { - // ignore exception - }); - }); - - auto l_transfer_fiber = ss::do_until( - [&stop] { return stop; }, - [&, this] { - std::vector not_leaders; - ss::lw_shared_ptr raft; - for (auto& n : current_replicas) { - if (node(n.id()).raft()->is_leader()) { - raft = node(n.id()).raft(); - } else { - not_leaders.push_back(n); - } - } - - if (!raft) { - return ss::sleep(100ms); - } - auto target = random_generators::random_choice(not_leaders); - return raft - ->transfer_leadership(transfer_leadership_request{ - .group = raft->group(), - .target = target.id(), - .timeout = 25ms, - }) - .then([this](transfer_leadership_reply r) { - if (r.result != raft::errc::success) { - vlog(logger().info, "error(transferring): {}", r); - } - }) - .then([] { return ss::sleep(200ms); }) - .handle_exception([](const std::exception_ptr&) { - // ignore exception - }); - }); - - co_await ss::when_all( - std::move(replicate_fiber), - std::move(reconfigure_fiber), - std::move(l_transfer_fiber)); - - logger().info("Validating log consistency"); - - auto dirty_offset = co_await retry_with_leader( - model::timeout_clock::now() + 30s, [](raft_node_instance& leader_node) { - return ::result(leader_node.raft()->dirty_offset()); - }); - - logger().info( - "Waiting for all nodes to be up to date. Dirty offset: {}", - dirty_offset.value()); - - std::vector node_ids; - node_ids.reserve(current_replicas.size()); - for (auto& vn : current_replicas) { - node_ids.push_back(vn.id()); - } - auto ec = co_await wait_for_offset(dirty_offset.value(), node_ids, *this); - ASSERT_EQ_CORO(ec, errc::success); - - using namespace testing; - for (auto o = model::offset(0); o < dirty_offset.value(); ++o) { - std::vector kafka_offsets; - kafka_offsets.reserve(node_ids.size()); - for (auto& id : node_ids) { - auto& node = this->node(id); - kafka_offsets.push_back(node.raft()->log()->from_log_offset(o)); - } - EXPECT_THAT(kafka_offsets, Each(Eq(kafka_offsets[0]))) << fmt::format( - "Offset translation failure at offset {}, kafka_offets: {}", - o, - kafka_offsets); - } -} + testing::Values(1), // initial size + testing::Values(5), // to add + testing::Values(0), // to remove + testing::Values(use_initial_learner_offset::no), + testing::Values(consistency_level::quorum_ack), + testing::Values(isolated_t::none))); diff --git a/src/v/storage/log_manager.cc b/src/v/storage/log_manager.cc index df7fa88883007..a3184c67c9764 100644 --- a/src/v/storage/log_manager.cc +++ b/src/v/storage/log_manager.cc @@ -82,8 +82,8 @@ log_config::log_config( , max_compacted_segment_size(config::mock_binding(5_GiB)) , compaction_priority(compaction_priority) , retention_bytes(config::mock_binding>(std::nullopt)) - , compaction_interval( - config::mock_binding(std::chrono::minutes(10))) + , compaction_interval(config::mock_binding( + std::chrono::seconds(1))) , log_retention( config::mock_binding>( std::chrono::minutes(10080))) diff --git a/tools/cmake_test.py b/tools/cmake_test.py index c236bdd84fdb5..8b4a9f7898a75 100755 --- a/tools/cmake_test.py +++ b/tools/cmake_test.py @@ -212,6 +212,7 @@ def __init__(self, root, prepare_command, post_command, binary, repeat, # fails. Locally, use INFO to improve runtime: the developer can # selectively re-run failing tests with more logging if needed. log_level = 'trace' if self.ci else 'info' + log_level = 'trace' def has_flag(flag, *synonyms): """Check if the args list already contains a particularly CLI flag,