diff --git a/src/v/cloud_topics/app.cc b/src/v/cloud_topics/app.cc index 9323f8423ba88..ebbe3ec302add 100644 --- a/src/v/cloud_topics/app.cc +++ b/src/v/cloud_topics/app.cc @@ -104,11 +104,13 @@ ss::future<> app::construct( co_await construct_service( l0_gc, + self, ss::sharded_parameter([&] { return &remote->local(); }), bucket, &controller->get_health_monitor(), &controller->get_controller_stm(), - &controller->get_topics_state()); + &controller->get_topics_state(), + &controller->get_members_table()); co_await construct_service(housekeeper_manager, ss::sharded_parameter([&] { return &replicated_metastore.local(); @@ -141,6 +143,7 @@ ss::future<> app::start() { [](auto& ds) { return ds.start(); }); co_await housekeeper_manager.invoke_on_all(&housekeeper_manager::start); co_await compaction_scheduler->start(); + co_await l0_gc.invoke_on_all(&level_zero_gc::start); // When start is called, we must have registered all the callbacks before // this as starting the manager will invoke callbacks for partitions already @@ -150,23 +153,6 @@ ss::future<> app::start() { } ss::future<> app::wire_up_notifications() { - co_await l0_gc.invoke_on_all([this](auto& gc) { - // Tie the starting/stopping of L0 GC to the L1 domain partition 0 so - // that it's a cluster wide singleton. - manager.local().on_l1_domain_leader([&gc]( - const model::ntp& ntp, - const auto&, - const auto& partition) noexcept { - if (ntp.tp.partition != model::partition_id{0}) { - return; - } - if (partition) { - gc.start(); - } else { - gc.pause(); - } - }); - }); co_await topic_purge_manager.invoke_on_all([this](auto& purge_mgr) { manager.local().on_l1_domain_leader([&purge_mgr]( const model::ntp& ntp, diff --git a/src/v/cloud_topics/level_zero/gc/BUILD b/src/v/cloud_topics/level_zero/gc/BUILD index 7b6e96f7e411b..c9ecf7dbf0c59 100644 --- a/src/v/cloud_topics/level_zero/gc/BUILD +++ b/src/v/cloud_topics/level_zero/gc/BUILD @@ -40,6 +40,7 @@ redpanda_cc_library( "//src/v/cloud_topics:types", "//src/v/container:chunked_hash_map", "//src/v/model", + "//src/v/utils:auto_fmt", "@seastar", ], ) diff --git a/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc b/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc index c397ddabf2652..0b81e92377c8b 100644 --- a/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc +++ b/src/v/cloud_topics/level_zero/gc/level_zero_gc.cc @@ -9,27 +9,184 @@ */ #include "cloud_topics/level_zero/gc/level_zero_gc.h" +#include "base/format_to.h" #include "base/vlog.h" #include "cloud_io/remote.h" #include "cloud_topics/logger.h" #include "cloud_topics/object_utils.h" #include "cluster/health_monitor_frontend.h" +#include "cluster/members_table.h" #include "cluster/topic_table.h" #include "ssx/semaphore.h" #include "ssx/work_queue.h" #include #include +#include #include #include namespace cloud_topics { +namespace { +struct trie; +struct trie_node { + trie_node() = default; + explicit trie_node(char c, const trie_node& parent) + : depth_(parent.depth_ + 1) + , char_(c) + , parent_(&parent) {} + + void insert(int v, std::string_view s) { + vassert(s.size() <= 3, "String representation too long: {}", s); + if (s.empty()) { + min = max = v; + return; + } + min = std::min(min, v); + max = std::max(max, v); + is_leaf = false; + auto c = s[0]; + vassert(c >= '0' && c <= '9', "Wrong char: {}", c); + auto [it, inserted] = children.try_emplace( + c, std::make_unique(c, *this)); + // TODO: remove (impossible) + vassert(it != children.end(), "Bad emplace"); + auto& [_, child] = *it; + vassert(child != nullptr, "Child ptr unexpectedly NULL"); + // basically ok with the recursion because we'll only go 3 levels deep + child->insert(v, s.substr(1)); + } + + ss::sstring str() const { + // TODO: magic number i guess + ss::sstring result{ss::sstring::initialized_later{}, 3}; + size_t i{0}; + for (const auto* curr{this}; curr != nullptr; curr = curr->parent_) { + result[i++] = curr->char_; + } + result.resize(i); + std::ranges::reverse(result); + return result; + } + + fmt::iterator format_to(fmt::iterator it) const { + if (depth_ == 0) { + it = fmt::format_to(it, "ROOT "); + } else { + it = fmt::format_to(it, "{}: D_{} ", char_, depth_); + } + it = fmt::format_to( + it, "[{},{}] {}\n", min, max, is_leaf ? str() : "..."); + for (const auto& [c, child] : children) { + it = fmt::format_to(it, "{0:\t>{1}}: {2}", "", depth_ + 1, *child); + } + return it; + } + + bool saturated() const { + static constexpr std::array saturation = {999, 99, 9, 0}; + if (is_leaf) { + return true; + } + + vassert(max >= min, "OOPS: [{},{}]", min, max); + vassert(depth_ < saturation.size(), "Bad depth: {}", depth_); + return (max - min) == saturation.at(depth_); + } + +private: + friend struct trie; + int min{std::numeric_limits::max()}; + int max{std::numeric_limits::min()}; + std::map> children; + bool is_leaf{true}; + size_t depth_{0}; + char char_{'\0'}; + const trie_node* parent_{nullptr}; +}; + +struct trie { + void insert(prefix_range_inclusive prefixes) { + ss::sstring buf; + for (auto i = prefixes.min; i <= prefixes.max; ++i) { + buf = ssx::sformat("{:03}", i); + root.insert(i, buf); + } + } + + void prune() { + traverse([](trie_node& n) -> bool { + if (n.saturated()) { + n.is_leaf = true; + n.children.clear(); + return true; + } + return false; + }); + } + + chunked_vector collect_prefixes() const { + chunked_vector result; + traverse([&result](const trie_node& n) -> bool { + if (n.is_leaf) { + result.emplace_back(n.str()); + return true; + } + return false; + }); + return result; + } + + void traverse(ss::noncopyable_function fn) { + chunked_vector stk; + stk.push_back(&root); + while (!stk.empty()) { + auto* node = stk.back(); + vassert(node != nullptr, "Node pointer unexpectedly null"); + stk.pop_back(); + if (fn(*node)) { + continue; + } + std::ranges::transform( + node->children, std::back_inserter(stk), [](const auto& pr) { + return pr.second.get(); + }); + } + } + + void traverse(ss::noncopyable_function fn) const { + chunked_vector stk; + stk.push_back(&root); + while (!stk.empty()) { + auto* node = stk.back(); + vassert(node != nullptr, "Node pointer unexpectedly null"); + stk.pop_back(); + if (fn(*node)) { + continue; + } + std::ranges::transform( + node->children, std::back_inserter(stk), [](const auto& pr) { + return pr.second.get(); + }); + } + } + void clear() { root = {}; } + trie_node root; + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to(it, "{}", root); + } +}; +} // namespace + class level_zero_gc::list_delete_worker { public: explicit list_delete_worker( - std::unique_ptr storage, level_zero_gc_probe& probe) + std::unique_ptr storage, + std::unique_ptr node_info, + level_zero_gc_probe& probe) : storage_(std::move(storage)) + , node_info_(std::move(node_info)) , probe_(&probe) , worker_([](std::exception_ptr eptr) { vlog(cd_log.warn, "Exception from delete worker: {}", eptr); @@ -60,37 +217,29 @@ class level_zero_gc::list_delete_worker { vlog(cd_log.info, "Stopped cloud topics list/delete worker"); } - seastar::future> - collect() { - throw std::runtime_error("not implemented"); - } - bool has_capacity() const { return page_sem_.available_units() > 0; } seastar::future, cloud_storage_clients::error_outcome>> next_page() { - // cached continuation is single use. pass it to list_objects and null - // it out immediately. - auto objects = co_await storage_->list_objects( - &as_, std::exchange(continuation_token_, std::nullopt)); - - // fairly naive approach to caching the token. if the list request - // failed, we leave the cached token empty, but if some other error - // occurs while processing a page, we keep the token and "skip" that - // page. with lexicographically ordered list results and monotonic - // epochs, any eligible keys in a skipped page are guaranteed to appear - // in a subsequent round. given the volume of L0 objects at higher - // throughput rates, we're going to err on the side of making progress - // (vs performing a perfect sweep of outstanding objects). - if ( - objects.has_value() - && !objects.value().next_continuation_token.empty()) { - continuation_token_.emplace( - std::move(objects.value().next_continuation_token)); + maybe_update_prefixes(); + vlog(cd_log.debug, "PREFIX RANGE: {}", prefix_range_); + auto stop = ss::stop_iteration{owned_prefixes_.empty()}; + // keep listing until we reach our assigned prefix range + while (!stop) { + auto res = co_await do_next_page(prefix_range_.value()); + if (!res.has_value()) { + co_return std::unexpected{res.error()}; + } + auto [objects, should_stop] = std::move(res).value(); + stop = should_stop; + if (!objects.empty()) { + co_return std::move(objects); + } } - co_return objects; + co_return chunked_vector< + cloud_storage_clients::client::list_bucket_item>{}; } size_t delete_objects( @@ -163,7 +312,100 @@ class level_zero_gc::list_delete_worker { } private: + seastar::future, + ss::stop_iteration>, + cloud_storage_clients::error_outcome>> + do_next_page(prefix_range_inclusive prefix_range) { + // cached continuation is single use. pass it to list_objects and null + // it out immediately. + auto objects = co_await storage_->list_objects( + &as_, std::exchange(continuation_token_, std::nullopt)); + + if (!objects.has_value()) { + co_return std::unexpected{objects.error()}; + } + // list_objects returned nothing so we can bail out right away + if (objects.value().contents.empty()) { + co_return std::make_pair( + std::move(objects.value().contents), ss::stop_iteration::yes); + } + + // fairly naive approach to caching the token. if the list request + // failed, we leave the cached token empty, but if some other error + // occurs while processing a page, we keep the token and "skip" that + // page. with lexicographically ordered list results and monotonic + // epochs, any eligible keys in a skipped page are guaranteed to appear + // in a subsequent round. given the volume of L0 objects at higher + // throughput rates, we're going to err on the side of making progress + // (vs performing a perfect sweep of outstanding objects). + if (!objects.value().next_continuation_token.empty()) { + continuation_token_.emplace( + std::move(objects.value().next_continuation_token)); + } + + auto& contents = objects.value().contents; + auto total_objects = contents.size(); + auto give_up = ss::stop_iteration::no; + auto [end, _] = std::ranges::remove_if( + contents, + [&prefix_range, + &give_up](const cloud_storage_clients::client::list_bucket_item& o) { + auto obj_prefix = object_path_factory::level_zero_path_to_prefix( + o.key); + // list results are lexicographically ordered, so we should give + // up only once we've exceeded this shards max acceptable prefix + // OR if we hit an invalid key + if (!obj_prefix.has_value()) { + vlog( + cd_log.error, + "Unable to parse prefix during L0 GC: {}", + obj_prefix.error()); + give_up = ss::stop_iteration::yes; + return true; + } + if (obj_prefix.value() > prefix_range.max) { + give_up = ss::stop_iteration::yes; + } + return !prefix_range.contains(obj_prefix.value()); + }); + contents.erase_to_end(end); + + vlog( + cd_log.debug, + "RESPONSE SIZE: {} vs ACCEPTED SIZE: {}", + total_objects, + contents.size()); + + co_return std::make_pair(std::move(contents), give_up); + } + + void maybe_update_prefixes() { + auto range = compute_prefix_range( + node_info_->shard_index(), node_info_->total_shards()); + if (prefix_range_ == range) { + // nothing changed, nothing to do + return; + } + prefix_range_ = range; + prefix_trie_.clear(); + owned_prefixes_.clear(); + if (prefix_range_.has_value()) { + prefix_trie_.insert(prefix_range_.value()); + std::cerr << prefix_trie_ << std::endl; + prefix_trie_.prune(); + std::cerr << prefix_trie_ << std::endl; + owned_prefixes_ = prefix_trie_.collect_prefixes(); + vlog( + cd_log.trace, + "PREFIXES FOR LIST: {}\n", + fmt::join(owned_prefixes_, ",")); + } + } + std::unique_ptr storage_; + std::unique_ptr node_info_; level_zero_gc_probe* probe_; ssx::work_queue worker_; // TODO: configurable limits? @@ -175,6 +417,10 @@ class level_zero_gc::list_delete_worker { seastar::abort_source as_{}; seastar::gate gate_{}; std::optional continuation_token_{}; + + std::optional prefix_range_{}; + trie prefix_trie_; + chunked_vector owned_prefixes_; }; class object_storage_remote_impl : public level_zero_gc::object_storage { @@ -202,7 +448,9 @@ class object_storage_remote_impl : public level_zero_gc::object_storage { object_path_factory::level_zero_data_dir(), std::nullopt /*delimiter*/, std::nullopt /*item_filter*/, - std::nullopt /*max_keys*/, + // TODO(oren): should depend on backend I guess (abs is 5000 max, + // though maybe it's not useful to take that many) + 1000 /*max_keys*/, std::move(continuation_token)); if (res.has_value()) { co_return std::move(res).assume_value(); @@ -484,10 +732,40 @@ class epoch_source_impl : public level_zero_gc::epoch_source { seastar::sharded* topic_table_; }; +class node_info_impl : public level_zero_gc::node_info { +public: + node_info_impl( + model::node_id self, seastar::sharded* mt) + : self_(self) + , members_table_(mt) {} + + // TODO: use uint32 I guess + size_t shard_index() const final { + return shards_up_to(self_) + seastar::this_shard_id(); + } + size_t total_shards() const final { + return shards_up_to(model::node_id::max()); + } + +private: + size_t shards_up_to(model::node_id ub) const { + size_t total{0}; + for (const auto& [id, node] : members_table_->local().nodes()) { + if (id < ub) { + total += node.broker.properties().cores; + } + } + return total; + } + model::node_id self_; + seastar::sharded* members_table_; +}; + level_zero_gc::level_zero_gc( level_zero_gc_config config, std::unique_ptr storage, - std::unique_ptr epoch_source) + std::unique_ptr epoch_source, + std::unique_ptr node_info) : config_(std::move(config)) , epoch_source_(std::move(epoch_source)) , should_run_(false) // begin in a stopped state @@ -495,14 +773,17 @@ level_zero_gc::level_zero_gc( , worker_(worker()) , probe_(config::shard_local_cfg().disable_metrics()) , delete_worker_( - std::make_unique(std::move(storage), probe_)) {} + std::make_unique( + std::move(storage), std::move(node_info), probe_)) {} level_zero_gc::level_zero_gc( + model::node_id self, cloud_io::remote* remote, cloud_storage_clients::bucket_name bucket, seastar::sharded* health_monitor, seastar::sharded* controller_stm, - seastar::sharded* topic_table) + seastar::sharded* topic_table, + seastar::sharded* members_table) : level_zero_gc( level_zero_gc_config{ .deletion_grace_period @@ -516,7 +797,8 @@ level_zero_gc::level_zero_gc( }, std::make_unique(remote, std::move(bucket)), std::make_unique( - health_monitor, controller_stm, topic_table)) {} + health_monitor, controller_stm, topic_table), + std::make_unique(self, members_table)) {} level_zero_gc::~level_zero_gc() = default; @@ -676,7 +958,7 @@ level_zero_gc::do_try_to_collect(std::optional& max_gc_epoch) { std::optional last_epoch; object_id::prefix_t last_prefix{0}; - for (const auto& object : candidate_objects.value().contents) { + for (const auto& object : candidate_objects.value()) { const auto object_epoch = object_path_factory::level_zero_path_to_epoch( object.key); @@ -758,4 +1040,45 @@ level_zero_gc::do_try_to_collect(std::optional& max_gc_epoch) { std::move(eligible_objects), object_keys_total_bytes); } +bool prefix_range_inclusive::contains(T v) const { + return v >= min && v <= max; +} + +bool prefix_range_inclusive::operator==( + const prefix_range_inclusive& other) const { + return min == other.min && max == other.max; +} + +std::optional +compute_prefix_range(size_t shard_idx, size_t total_shards) { + // TODO: maybe if shard id exceeds number of prefix we should wrap around? + auto total_prefixes = object_id::prefix_max + 1; + total_shards = std::min(total_shards, static_cast(total_prefixes)); + if (shard_idx >= total_shards) { + return std::nullopt; + } + auto stride = total_prefixes / total_shards; + auto min = static_cast(shard_idx * stride); + auto max = static_cast(min + stride - 1); + if (shard_idx == total_shards - 1) { + max = object_id::prefix_max; + } + + vassert(min >= 0, "Invalid min prefix: {}", min); + vassert(max <= object_id::prefix_max, "Invalid max prefix: {}", max); + + vlog( + cd_log.info, + "STRIDE: {} IDX: {} PREFIXES: [{},{}]", + stride, + shard_idx, + min, + max); + + return prefix_range_inclusive{ + .min = min, + .max = max, + }; +} + } // namespace cloud_topics diff --git a/src/v/cloud_topics/level_zero/gc/level_zero_gc.h b/src/v/cloud_topics/level_zero/gc/level_zero_gc.h index 69d548d533568..302f9344097a5 100644 --- a/src/v/cloud_topics/level_zero/gc/level_zero_gc.h +++ b/src/v/cloud_topics/level_zero/gc/level_zero_gc.h @@ -14,6 +14,7 @@ #include "cloud_topics/level_zero/gc/level_zero_gc_probe.h" #include "cloud_topics/types.h" #include "container/chunked_hash_map.h" +#include "utils/auto_fmt.h" #include #include @@ -31,6 +32,7 @@ namespace cluster { class controller_stm; class health_monitor_frontend; class topic_table; +class members_table; } // namespace cluster namespace cloud_topics { @@ -227,6 +229,18 @@ class level_zero_gc { get_partitions_max_gc_epoch(seastar::abort_source*) = 0; }; + struct node_info { + node_info() = default; + node_info(const node_info&) = default; + node_info(node_info&&) = delete; + node_info& operator=(const node_info&) = default; + node_info& operator=(node_info&&) = delete; + virtual ~node_info() = default; + + virtual size_t shard_index() const = 0; + virtual size_t total_shards() const = 0; + }; + public: /* * Construct with the given storage and epoch providers. This interface is @@ -235,17 +249,20 @@ class level_zero_gc { level_zero_gc( level_zero_gc_config, std::unique_ptr, - std::unique_ptr); + std::unique_ptr, + std::unique_ptr); /* * Construct with default implementations of storage and epoch providers. */ level_zero_gc( + model::node_id, cloud_io::remote*, cloud_storage_clients::bucket_name, seastar::sharded*, seastar::sharded*, - seastar::sharded*); + seastar::sharded*, + seastar::sharded*); ~level_zero_gc(); @@ -284,4 +301,15 @@ class level_zero_gc { std::unique_ptr delete_worker_{}; }; +struct prefix_range_inclusive : auto_fmt { + using T = object_id::prefix_t; + T min; + T max; + bool contains(T v) const; + bool operator==(const prefix_range_inclusive& other) const; +}; + +std::optional +compute_prefix_range(size_t shard_idx, size_t total_shards); + } // namespace cloud_topics diff --git a/src/v/cloud_topics/level_zero/gc/tests/BUILD b/src/v/cloud_topics/level_zero/gc/tests/BUILD index 2ff3748ef6e06..3c7ba5766443f 100644 --- a/src/v/cloud_topics/level_zero/gc/tests/BUILD +++ b/src/v/cloud_topics/level_zero/gc/tests/BUILD @@ -15,3 +15,19 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "level_zero_gc_mt_test", + cpu = 3, + timeout = "short", + srcs = [ + "level_zero_gc_mt_test.cc", + ], + deps = [ + "//src/v/cloud_topics:object_utils", + "//src/v/cloud_topics/level_zero/gc:level_zero_gc", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc new file mode 100644 index 0000000000000..06a8ff750c47f --- /dev/null +++ b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc @@ -0,0 +1,413 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +/* + * Multithreaded end-to-end tests for the sharded L0 garbage collector. + * + * These tests run the GC on multiple real Seastar shards to verify: + * - All shards participate in the GC process + * - Objects are deleted correctly when running concurrently + * - No crashes or races in concurrent execution + * + * Design: To avoid cross-shard data movement of chunked_vector (which has + * shard-local allocations), we: + * 1. Store object keys in a std::vector on shard 0 + * 2. Each shard copies the keys locally and builds chunked_vector there + * 3. Track activity (list/delete calls) via cross-shard atomic increments + */ + +#include "cloud_topics/level_zero/gc/level_zero_gc.h" +#include "cloud_topics/object_utils.h" +#include "test_utils/test.h" + +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace cloud_topics { + +/* + * Shared state for the object storage mock, held on shard 0. + * Note: We use std::vector and std::string (not ss::sstring or chunked_vector) + * to allow safe cross-shard copying of keys. + */ +struct shared_bucket_state { + // Objects in the bucket - use std::vector for safe copying + std::vector objects; + + // Track which shards made list requests (bitset represented as int) + std::atomic shards_that_listed{0}; + + // Track which shards made delete requests + std::atomic shards_that_deleted{0}; + + // Total objects deleted (may exceed object count due to concurrent deletes) + std::atomic total_deleted{0}; + + // Track remaining objects (decremented on delete) + // TODO(oren): this should be able to go negative and we should assert that + // it doesn't + std::atomic objects_remaining{0}; + + // Max GC eligible epoch + std::optional max_epoch; +}; + +/* + * Object storage mock that builds results locally on each shard. + * Cross-shard calls are only used for tracking and atomic updates. + */ +class mt_object_storage : public level_zero_gc::object_storage { +public: + mt_object_storage(shared_bucket_state* bucket_state) + : g_bucket_state(bucket_state) {} + ss::future> + list_objects(ss::abort_source* as, std::optional) override { + if (as->abort_requested()) { + co_return std::unexpected{ + cloud_storage_clients::error_outcome::fail}; + } + auto caller_shard = ss::this_shard_id(); + g_bucket_state->shards_that_listed.fetch_or( + 1ULL << caller_shard, std::memory_order_relaxed); + + // reasonably safe under the assumption that first we insert objects, + // then we start the GC + const auto& keys = g_bucket_state->objects; + // Build the result locally on this shard + chunked_vector items; + items.reserve(keys.size()); + for (const auto& key : keys) { + items.push_back( + cloud_storage_clients::client::list_bucket_item{ + .key = ss::sstring(key), + .last_modified = std::chrono::system_clock::now() - 24h, + }); + } + + // Sort lexicographically to simulate real cloud storage + std::ranges::sort( + items, [](const auto& a, const auto& b) { return a.key < b.key; }); + + co_return cloud_storage_clients::client::list_bucket_result{ + .contents = std::move(items), + }; + } + + ss::future> delete_objects( + ss::abort_source* as, + chunked_vector objects) + override { + if (as->abort_requested()) { + co_return std::unexpected{cloud_io::upload_result::cancelled}; + } + + auto caller_shard = ss::this_shard_id(); + auto prev_deleted = local_deleted_keys.size(); + local_deleted_keys.insert_range( + objects + | std::views::transform([](const auto& item) { return item.key; })); + auto num_deleted = local_deleted_keys.size() - prev_deleted; + + g_bucket_state->shards_that_deleted.fetch_or( + 1ULL << caller_shard, std::memory_order_relaxed); + g_bucket_state->total_deleted.fetch_add( + num_deleted, std::memory_order_relaxed); + // Decrement remaining count (saturating at 0) + auto remaining = g_bucket_state->objects_remaining.load( + std::memory_order_relaxed); + // TODO: simplify + while (remaining > 0) { + auto to_subtract = std::min(remaining, num_deleted); + if (g_bucket_state->objects_remaining.compare_exchange_weak( + remaining, + remaining - to_subtract, + std::memory_order_relaxed)) { + break; + } + } + + co_return std::expected(); + } + shared_bucket_state* g_bucket_state; + + std::unordered_set local_deleted_keys{}; +}; + +/* + * Epoch source that accesses the global shared state. + */ +class mt_epoch_source : public level_zero_gc::epoch_source { +public: + explicit mt_epoch_source(shared_bucket_state* bucket_state) + : g_bucket_state(bucket_state) {} + ss::future, std::string>> + max_gc_eligible_epoch(ss::abort_source*) override { + auto epoch = co_await ss::smp::submit_to( + 0, [this]() { return g_bucket_state->max_epoch; }); + if (epoch.has_value()) { + co_return std::optional{ + cluster_epoch(epoch.value())}; + } + co_return std::optional{std::nullopt}; + } + + ss::future> + get_partitions(ss::abort_source*) override { + co_return std::unexpected("not implemented"); + } + + ss::future> + get_partitions_max_gc_epoch(ss::abort_source*) override { + co_return std::unexpected("not implemented"); + } + shared_bucket_state* g_bucket_state; +}; + +/* + * Node info that returns shard index based on the current Seastar shard. + */ +class mt_node_info : public level_zero_gc::node_info { +public: + size_t shard_index() const override { return ss::this_shard_id(); } + size_t total_shards() const override { return ss::smp::count; } +}; + +/* + * Test fixture for multithreaded GC tests. + */ +struct level_zero_gc_mt_test : public seastar_test { + ss::future<> SetUpAsync() override { + vassert(ss::this_shard_id() == ss::shard_id{0}, "Setup on not shard 0"); + vassert(ss::smp::count > 1, "Too few shards"); + // Create shared state on shard 0 + g_bucket_state = std::make_unique(); + + // Start GC on all shards + co_await gc_.start( + ss::sharded_parameter([] { + return level_zero_gc_config{ + .deletion_grace_period + = config::mock_binding(12h), + .throttle_progress + = config::mock_binding(10ms), + .throttle_no_progress + = config::mock_binding(10ms), + }; + }), + ss::sharded_parameter([this] { + return std::make_unique(g_bucket_state.get()); + }), + ss::sharded_parameter([this] { + return std::make_unique(g_bucket_state.get()); + }), + ss::sharded_parameter( + [] { return std::make_unique(); })); + } + + ss::future<> TearDownAsync() override { + co_await gc_.invoke_on_all(&level_zero_gc::stop); + co_await gc_.stop(); + std::exchange(g_bucket_state, nullptr); + } + + // Add objects with various prefixes (call from shard 0 context) + void populate_objects(size_t count) { + g_bucket_state->objects.reserve(count); + for (size_t i = 0; i < count; ++i) { + auto prefix = static_cast(i % 1000); + auto id = object_id{ + .epoch = cluster_epoch(1), + .name = uuid_t::create(), + .prefix = prefix, + }; + auto key = object_path_factory::level_zero_path(id); + g_bucket_state->objects.push_back(key().string()); + } + g_bucket_state->objects_remaining.store( + count, std::memory_order_relaxed); + } + + void set_max_epoch(int64_t epoch) { g_bucket_state->max_epoch = epoch; } + + size_t get_objects_remaining() const { + return g_bucket_state->objects_remaining.load( + std::memory_order_relaxed); + } + + size_t get_total_deleted() const { + return g_bucket_state->total_deleted.load(std::memory_order_relaxed); + } + + size_t get_shards_that_listed() const { + return std::popcount( + g_bucket_state->shards_that_listed.load(std::memory_order_relaxed)); + } + + size_t get_shards_that_deleted() const { + return std::popcount( + g_bucket_state->shards_that_deleted.load(std::memory_order_relaxed)); + } + + ss::sharded gc_; + // Global pointer to shared state (lives on shard 0) + // This is safe because tests run sequentially and we manage its lifecycle + std::unique_ptr g_bucket_state{}; +}; + +/* + * Test that all shards participate in listing when GC runs. + */ +TEST_F_CORO(level_zero_gc_mt_test, all_shards_participate_in_listing) { + // Add objects spread across all prefixes + populate_objects(1000); + set_max_epoch(100); + + // Start GC on all shards + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Wait for all shards to have made at least one list request + auto deadline = ss::lowres_clock::now() + 10s; + while (ss::lowres_clock::now() < deadline) { + if (get_shards_that_listed() == ss::smp::count) { + break; + } + co_await ss::sleep(100ms); + } + + EXPECT_EQ(get_shards_that_listed(), ss::smp::count) + << "Not all shards participated in listing"; +} + +/* + * Test that objects are deleted when running on multiple shards. + */ +TEST_F_CORO(level_zero_gc_mt_test, objects_deleted_across_shards) { + // TODO(oren): should we have a test case where some prefixes aren't + // touched? + const size_t num_objects = 1000; + populate_objects(num_objects); + set_max_epoch(100); + + EXPECT_EQ(get_objects_remaining(), num_objects); + + // Start GC on all shards + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Wait for all objects to be deleted + auto deadline = ss::lowres_clock::now() + 5s; + while (ss::lowres_clock::now() < deadline) { + if (get_objects_remaining() == 0) { + break; + } + co_await ss::sleep(100ms); + } + + EXPECT_EQ(get_objects_remaining(), 0) << "Not all objects were deleted"; + EXPECT_EQ(get_total_deleted(), num_objects) << "Deletion count wrong"; +} + +/* + * Test that multiple shards participate in deletion. + */ +TEST_F_CORO(level_zero_gc_mt_test, multiple_shards_delete) { + // Add enough objects that multiple shards should participate + populate_objects(1000); + set_max_epoch(100); + + // Start GC on all shards + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Wait for deletions to occur + auto deadline = ss::lowres_clock::now() + 30s; + while (ss::lowres_clock::now() < deadline) { + if (get_objects_remaining() == 0) { + break; + } + co_await ss::sleep(100ms); + } + + // We expect all shards to have participated in deletion + EXPECT_EQ(get_shards_that_deleted(), ss::smp::count) + << "No shards deleted anything"; +} + +/* + * Test that GC works correctly when there are no objects. + */ +TEST_F_CORO(level_zero_gc_mt_test, no_objects_no_crash) { + set_max_epoch(100); + + // Start GC on all shards with an empty bucket + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Give it some time to run + co_await ss::sleep(500ms); + + // Should complete without crashing + EXPECT_EQ(get_shards_that_listed(), ss::smp::count) + << "No shards attempted to list"; +} + +/* + * Test that GC handles no eligible epoch gracefully. + */ +TEST_F_CORO(level_zero_gc_mt_test, no_eligible_epoch) { + populate_objects(100); + // Don't set max_epoch - it will be nullopt + + // Start GC on all shards + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Give it some time to run + co_await ss::sleep(500ms); + + // Objects should not be deleted since there's no eligible epoch + EXPECT_EQ(get_objects_remaining(), 100) + << "Objects were deleted without eligible epoch"; +} + +/* + * Test start/pause/start cycle works correctly. + */ +TEST_F_CORO(level_zero_gc_mt_test, start_pause_start_cycle) { + populate_objects(200); + set_max_epoch(100); + + // Start GC + co_await gc_.invoke_on_all(&level_zero_gc::start); + co_await ss::sleep(200ms); + + // Pause GC + co_await gc_.invoke_on_all(&level_zero_gc::start); + co_await ss::sleep(100ms); + + // Start again + co_await gc_.invoke_on_all(&level_zero_gc::start); + + // Wait for all objects to be deleted + auto deadline = ss::lowres_clock::now() + 30s; + while (ss::lowres_clock::now() < deadline) { + if (get_objects_remaining() == 0) { + break; + } + co_await ss::sleep(100ms); + } + + EXPECT_EQ(get_objects_remaining(), 0) + << "Not all objects deleted after restart"; +} + +} // namespace cloud_topics diff --git a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc index 73f73e372e583..962b8c6531d17 100644 --- a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc +++ b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_tests.cc @@ -138,6 +138,26 @@ class epoch_source_test_impl std::optional* epoch_; }; +/* + * Configurable node_info implementation for testing different shard + * configurations. Unlike the production node_info_impl which computes shard + * indices based on the members table, this allows direct control over the + * shard index and total shard count. + */ +class node_info_test_impl : public cloud_topics::level_zero_gc::node_info { +public: + node_info_test_impl() = default; + node_info_test_impl(size_t shard_idx, size_t total) + : shard_idx_(shard_idx) + , total_shards_(total) {} + size_t shard_index() const final { return shard_idx_; } + size_t total_shards() const final { return total_shards_; } + +private: + size_t shard_idx_{0}; + size_t total_shards_{1}; +}; + class LevelZeroGCTest : public testing::Test { public: LevelZeroGCTest( @@ -155,7 +175,8 @@ class LevelZeroGCTest : public testing::Test { throttle_no_progress), }, std::make_unique(&listed, &deleted, &cfg), - std::make_unique(&max_epoch)) {} + std::make_unique(&max_epoch), + std::make_unique()) {} void TearDown() override { gc.stop().get(); } @@ -185,12 +206,13 @@ class LevelZeroGCTest : public testing::Test { }; template -::testing::AssertionResult Eventually(Func func, int retries = 50) { +::testing::AssertionResult Eventually( + Func func, int retries = 50, std::chrono::milliseconds delay = 20ms) { while (retries-- > 0) { if (func()) { return ::testing::AssertionSuccess(); } - seastar::sleep_abortable(std::chrono::milliseconds(100)).get(); + seastar::sleep_abortable(delay).get(); } return ::testing::AssertionFailure() << "Timeout"; } @@ -419,5 +441,422 @@ TEST_F(LevelZeroGCScaleOutTest, ConcurrentDeletesPipelineSaturation) { this->cfg.delete_cost = 50ms; gc.start(); EXPECT_TRUE(Eventually( - [this, expected = (size_t)n] { return deleted.size() == expected; })); + [this, expected = (size_t)n] { return deleted.size() == expected; }, + 50, + 100ms)); +} + +namespace { + +/* + * Calculate the expected prefix range for a given shard configuration. + * This mirrors the compute_prefix_range logic in list_delete_worker. + */ +// cloud_topics::prefix_range_inclusive +// expected_prefix_range(size_t shard_idx, size_t total_shards) { +// auto range = cloud_topics::compute_prefix_range(shard_idx, total_shards); + +// return std::make_pair(range.min, range.max); +// } + +} // namespace + +// ============================================================================= +// Prefix Range Computation Tests (Static, no GC required) +// ============================================================================= + +class PrefixRangeComputationTest : public testing::Test {}; + +/* + * With a single shard, it should handle all prefixes [0, 999]. + */ +TEST_F(PrefixRangeComputationTest, SingleShardCoversAllPrefixes) { + auto range = cloud_topics::compute_prefix_range(0, 1); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + EXPECT_EQ(min, 0); + EXPECT_EQ(max, cloud_topics::object_id::prefix_max); +} + +/* + * With two shards, verify non-overlapping ranges that cover the full space. + */ +TEST_F(PrefixRangeComputationTest, TwoShardsPartitionSpace) { + { + auto range = cloud_topics::compute_prefix_range(0, 2); + ASSERT_TRUE(range.has_value()); + // First shard: [0, 500) + EXPECT_EQ(range->min, 0); + EXPECT_EQ(range->max, 499); + } + + { + auto range = cloud_topics::compute_prefix_range(1, 2); + ASSERT_TRUE(range.has_value()); + // Second shard: [500, 999] + EXPECT_EQ(range->min, 500); + EXPECT_EQ(range->max, cloud_topics::object_id::prefix_max); + } +} + +/* + * With 1000 shards (one per prefix), each shard handles exactly one prefix. + */ +TEST_F(PrefixRangeComputationTest, ThousandShardsOnePerPrefix) { + constexpr size_t total = 1000; + + for (size_t i = 0; i < total; ++i) { + auto range = cloud_topics::compute_prefix_range(i, total); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + if (i < total - 1) { + EXPECT_EQ(min, i); + EXPECT_EQ(max, i); + } else { + EXPECT_EQ(min, i); + EXPECT_EQ(max, cloud_topics::object_id::prefix_max); + } + } +} + +/* + * TODO(oren): With more shards than prefixes (e.g., 2000 shards), half of + * shards get no work. would be better if this were balanced more evenly. + */ +TEST_F(PrefixRangeComputationTest, MoreShardsThanPrefixes) { + constexpr size_t total = 2000; + + { + auto range = cloud_topics::compute_prefix_range(0, total); + ASSERT_TRUE(range.has_value()); + EXPECT_EQ(range->min, 0); + EXPECT_EQ(range->max, 0); + } + + { + auto range = cloud_topics::compute_prefix_range( + cloud_topics::object_id::prefix_max, total); + ASSERT_TRUE(range.has_value()); + EXPECT_EQ(range->min, cloud_topics::object_id::prefix_max); + EXPECT_EQ(range->max, cloud_topics::object_id::prefix_max); + } + + EXPECT_FALSE( + cloud_topics::compute_prefix_range(total - 1, total).has_value()); +} + +/* + * Verify complete coverage with 41 shards (simulating a heterogeneous cluster). + */ +TEST_F(PrefixRangeComputationTest, HeterogeneousCompleteCoverage) { + constexpr size_t total = 41; + + std::vector coverage_count(1000, 0); + + for (size_t shard = 0; shard < total; ++shard) { + auto r = cloud_topics::compute_prefix_range(shard, total); + auto [min, max] = r.value(); + EXPECT_GE(min, 0); + EXPECT_LE(max, cloud_topics::object_id::prefix_max); + for (auto prefix = min; prefix <= max && prefix < 1000; ++prefix) { + coverage_count[prefix]++; + } + } + + // Verify all prefixes are covered exactlye + for (int prefix = 0; prefix < 1000; ++prefix) { + EXPECT_EQ(coverage_count[prefix], 1) << fmt::format( + "Prefix {} covered {} times", prefix, coverage_count[prefix]); + } +} + +/* + * Base test fixture for prefix-based partitioning tests. + * Creates a single GC instance per test - do NOT create multiple GC instances. + */ +class LevelZeroGCPartitioningTest + : public testing::TestWithParam> { +public: + LevelZeroGCPartitioningTest() + : gc_( + cloud_topics::level_zero_gc_config{ + .deletion_grace_period + = config::mock_binding(12h), + .throttle_progress + = config::mock_binding(10ms), + .throttle_no_progress + = config::mock_binding(10ms), + }, + std::make_unique( + &listed_, &deleted_, &cfg_), + std::make_unique(&max_epoch_), + std::make_unique( + std::get<0>(GetParam()), std::get<1>(GetParam()))) {} + + void TearDown() override { gc_.stop().get(); } + + /* + * Insert an object with a specific prefix and epoch. + */ + void add_listed_with_prefix( + cloud_topics::object_id::prefix_t prefix, + int64_t epoch, + std::chrono::seconds age = 24h) { + auto key = cloud_topics::object_path_factory::level_zero_path( + cloud_topics::object_id{ + .epoch = cloud_topics::cluster_epoch(epoch), + .name = uuid_t::create(), + .prefix = prefix, + }); + cloud_storage_clients::client::list_bucket_item item{ + .key = key().string(), + .last_modified = std::chrono::system_clock::now() - age, + }; + listed_.push_back(item); + } + + /* + * Sort the listed objects to simulate lexicographic ordering from cloud + * storage. Must be called after adding all objects and before starting GC. + */ + void sort_listed() { + std::ranges::sort( + listed_, [](const auto& a, const auto& b) { return a.key < b.key; }); + } + + /* + * Count how many objects have prefixes within a given range. + */ + size_t count_objects_in_range( + cloud_topics::object_id::prefix_t min_prefix, + cloud_topics::object_id::prefix_t max_prefix) { + size_t count = 0; + for (const auto& obj : listed_) { + auto prefix + = cloud_topics::object_path_factory::level_zero_path_to_prefix( + obj.key); + if ( + prefix.has_value() && prefix.value() >= min_prefix + && prefix.value() <= max_prefix) { + ++count; + } + } + return count; + } + + /* + * Count deleted objects with prefixes within a given range. + */ + size_t count_deleted_in_range( + cloud_topics::object_id::prefix_t min_prefix, + cloud_topics::object_id::prefix_t max_prefix) { + size_t count = 0; + for (const auto& key : deleted_) { + auto prefix + = cloud_topics::object_path_factory::level_zero_path_to_prefix( + key); + if ( + prefix.has_value() && prefix.value() >= min_prefix + && prefix.value() <= max_prefix) { + ++count; + } + } + return count; + } + + size_t shard_idx() const { return std::get<0>(GetParam()); } + size_t total_shards() const { return std::get<1>(GetParam()); } + + chunked_vector listed_; + std::unordered_set deleted_; + std::optional max_epoch_; + cloud_topics::level_zero_gc gc_; + gc_test_config cfg_{}; +}; + +// ============================================================================= +// Parameterized Tests for Single Shard Configurations +// ============================================================================= + +/* + * Test that a shard only deletes objects within its assigned prefix range. + */ +TEST_P(LevelZeroGCPartitioningTest, ShardOnlyDeletesObjectsInRange) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + // Add objects across the full prefix range (every 50th prefix) + for (int prefix = 0; prefix <= 999; prefix += 50) { + add_listed_with_prefix(prefix, 1); + } + sort_listed(); + + auto expected_in_range = count_objects_in_range(min, max); + max_epoch_ = 100; + + gc_.start(); + + // Only objects in this shard's range should be deleted + EXPECT_TRUE(Eventually([this, expected_in_range] { + return deleted_.size() == expected_in_range; + })); + + // Verify all deleted objects are within range + EXPECT_EQ(count_deleted_in_range(min, max), deleted_.size()); } + +/* + * Test pagination across prefix boundaries. + */ +TEST_P(LevelZeroGCPartitioningTest, PaginationWithinRange) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + // Use small pages to force pagination + cfg_.list_page_size = 5; + + // Add many objects within this shard's range + for (auto prefix = min; prefix <= max && prefix < 1000; prefix += 5) { + add_listed_with_prefix(prefix, 1); + } + sort_listed(); + + auto expected = count_objects_in_range(min, max); + max_epoch_ = 100; + + gc_.start(); + + EXPECT_TRUE( + Eventually([this, expected] { return deleted_.size() == expected; })); +} + +/* + * Test that epoch filtering still works with prefix partitioning. + */ +TEST_P(LevelZeroGCPartitioningTest, EpochFilteringWithPartitioning) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + // Add objects with various epochs, using prefixes in our range + if (min < 1000) { + add_listed_with_prefix(min, 50); // epoch 50, eligible + add_listed_with_prefix(min, 100); // epoch 100, boundary + add_listed_with_prefix(min, 150); // epoch 150, not eligible + add_listed_with_prefix(min, 200); // epoch 200, not eligible + } + sort_listed(); + + max_epoch_ = 100; // Only epochs <= 100 are eligible + + gc_.start(); + + // Only 2 objects (epochs 50 and 100) should be deleted + EXPECT_TRUE(Eventually([this] { return deleted_.size() == 2; })); +} + +/* + * Test that age filtering still works with prefix partitioning. + */ +TEST_P(LevelZeroGCPartitioningTest, AgeFilteringWithPartitioning) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + if (min < 1000) { + add_listed_with_prefix(min, 1, 24h); // old enough + add_listed_with_prefix( + min + 1 > max ? min : min + 1, 1, 24h); // old enough + add_listed_with_prefix( + min + 2 > max ? min : min + 2, 1, 1h); // too young + add_listed_with_prefix( + min + 3 > max ? min : min + 3, 1, 1h); // too young + } + sort_listed(); + + max_epoch_ = 100; + + gc_.start(); + + // Only 2 old objects should be deleted + EXPECT_TRUE(Eventually([this] { return deleted_.size() == 2; })); +} + +/* + * Test behavior when this shard has no objects in its range. + */ +TEST_P(LevelZeroGCPartitioningTest, NoObjectsInRange) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + // Add objects outside this shard's range + if (min > 0) { + // Add objects before our range + add_listed_with_prefix(0, 1); + } + if (max < 999) { + // Add objects after our range + add_listed_with_prefix(999, 1); + } + sort_listed(); + + max_epoch_ = 100; + + gc_.start(); + + // No objects should be deleted since none are in our range + EXPECT_FALSE(Eventually([this] { return !deleted_.empty(); })); +} + +/* + * Test objects at exact boundary prefix values. + */ +TEST_P(LevelZeroGCPartitioningTest, ObjectsAtBoundaries) { + auto range = cloud_topics::compute_prefix_range( + shard_idx(), total_shards()); + ASSERT_TRUE(range.has_value()); + auto [min, max] = range.value(); + + // Add object at min boundary + add_listed_with_prefix(min, 1); + // Add object at max boundary + if (max < 1000) { + add_listed_with_prefix(max, 1); + } + sort_listed(); + + auto expected = count_objects_in_range(min, max); + max_epoch_ = 100; + + gc_.start(); + + EXPECT_TRUE( + Eventually([this, expected] { return deleted_.size() == expected; })); +} + +// Instantiate tests for various shard configurations +INSTANTIATE_TEST_SUITE_P( + VariousShardCounts, + LevelZeroGCPartitioningTest, + testing::Values( + std::make_tuple(0, 1), // Single shard covering all prefixes + std::make_tuple(0, 2), // First half [0, 500) + std::make_tuple(1, 2), // Second half [500, 999] + std::make_tuple(0, 10), // First range [0, 100) + std::make_tuple(4, 10), // Middle range [400, 500) + std::make_tuple(9, 10), // Last range [900, 999] + std::make_tuple(0, 24), // First shard + std::make_tuple(23, 24) // Last shard (handles remainder) + ), + [](const testing::TestParamInfo>& info) { + return "Shard_" + std::to_string(std::get<0>(info.param)) + "_Of_" + + std::to_string(std::get<1>(info.param)); + });