Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions src/v/cloud_topics/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ ss::future<> app::construct(

co_await construct_service(
l0_gc,
self,
ss::sharded_parameter([&] { return &remote->local(); }),
bucket,
&controller->get_health_monitor(),
&controller->get_controller_stm(),
&controller->get_topics_state());
&controller->get_topics_state(),
&controller->get_members_table());

co_await construct_service(housekeeper_manager, ss::sharded_parameter([&] {
return &replicated_metastore.local();
Expand Down Expand Up @@ -141,6 +143,7 @@ ss::future<> app::start() {
[](auto& ds) { return ds.start(); });
co_await housekeeper_manager.invoke_on_all(&housekeeper_manager::start);
co_await compaction_scheduler->start();
co_await l0_gc.invoke_on_all(&level_zero_gc::start);

// When start is called, we must have registered all the callbacks before
// this as starting the manager will invoke callbacks for partitions already
Expand All @@ -150,23 +153,6 @@ ss::future<> app::start() {
}

ss::future<> app::wire_up_notifications() {
co_await l0_gc.invoke_on_all([this](auto& gc) {
// Tie the starting/stopping of L0 GC to the L1 domain partition 0 so
// that it's a cluster wide singleton.
manager.local().on_l1_domain_leader([&gc](
const model::ntp& ntp,
const auto&,
const auto& partition) noexcept {
if (ntp.tp.partition != model::partition_id{0}) {
return;
}
if (partition) {
gc.start();
} else {
gc.pause();
}
});
});
co_await topic_purge_manager.invoke_on_all([this](auto& purge_mgr) {
manager.local().on_l1_domain_leader([&purge_mgr](
const model::ntp& ntp,
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_zero/gc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ redpanda_cc_library(
"//src/v/cloud_topics:types",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/utils:auto_fmt",
"@seastar",
],
)
208 changes: 176 additions & 32 deletions src/v/cloud_topics/level_zero/gc/level_zero_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
#include "cloud_topics/logger.h"
#include "cloud_topics/object_utils.h"
#include "cluster/health_monitor_frontend.h"
#include "cluster/members_table.h"
#include "cluster/topic_table.h"
#include "ssx/semaphore.h"
#include "ssx/work_queue.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/as_future.hh>

Expand All @@ -28,8 +30,11 @@ namespace cloud_topics {
class level_zero_gc::list_delete_worker {
public:
explicit list_delete_worker(
std::unique_ptr<object_storage> storage, level_zero_gc_probe& probe)
std::unique_ptr<object_storage> storage,
std::unique_ptr<node_info> node_info,
level_zero_gc_probe& probe)
: storage_(std::move(storage))
, node_info_(std::move(node_info))
, probe_(&probe)
, worker_([](std::exception_ptr eptr) {
vlog(cd_log.warn, "Exception from delete worker: {}", eptr);
Expand Down Expand Up @@ -60,37 +65,30 @@ class level_zero_gc::list_delete_worker {
vlog(cd_log.info, "Stopped cloud topics list/delete worker");
}

seastar::future<std::expected<size_t, level_zero_gc::collection_error>>
collect() {
throw std::runtime_error("not implemented");
}

bool has_capacity() const { return page_sem_.available_units() > 0; }

seastar::future<std::expected<
cloud_storage_clients::client::list_bucket_result,
chunked_vector<cloud_storage_clients::client::list_bucket_item>,
cloud_storage_clients::error_outcome>>
next_page() {
// cached continuation is single use. pass it to list_objects and null
// it out immediately.
auto objects = co_await storage_->list_objects(
&as_, std::exchange(continuation_token_, std::nullopt));

// fairly naive approach to caching the token. if the list request
// failed, we leave the cached token empty, but if some other error
// occurs while processing a page, we keep the token and "skip" that
// page. with lexicographically ordered list results and monotonic
// epochs, any eligible keys in a skipped page are guaranteed to appear
// in a subsequent round. given the volume of L0 objects at higher
// throughput rates, we're going to err on the side of making progress
// (vs performing a perfect sweep of outstanding objects).
if (
objects.has_value()
&& !objects.value().next_continuation_token.empty()) {
continuation_token_.emplace(
std::move(objects.value().next_continuation_token));
auto prefix_range = compute_prefix_range(
node_info_->shard_index(), node_info_->total_shards());
vlog(cd_log.debug, "PREFIX RANGE: {}", prefix_range);
auto stop = ss::stop_iteration::no;
// keep listing until we reach our assigned prefix range
while (!stop) {
auto res = co_await do_next_page(prefix_range);
if (!res.has_value()) {
co_return std::unexpected{res.error()};
}
auto [objects, should_stop] = std::move(res).value();
stop = should_stop;
if (!objects.empty()) {
co_return std::move(objects);
}
}
co_return objects;
co_return chunked_vector<
cloud_storage_clients::client::list_bucket_item>{};
}

size_t delete_objects(
Expand Down Expand Up @@ -163,7 +161,77 @@ class level_zero_gc::list_delete_worker {
}

private:
seastar::future<std::expected<
std::pair<
chunked_vector<cloud_storage_clients::client::list_bucket_item>,
ss::stop_iteration>,
cloud_storage_clients::error_outcome>>
do_next_page(prefix_range_inclusive prefix_range) {
// cached continuation is single use. pass it to list_objects and null
// it out immediately.
auto objects = co_await storage_->list_objects(
&as_, std::exchange(continuation_token_, std::nullopt));

if (!objects.has_value()) {
co_return std::unexpected{objects.error()};
}
// list_objects returned nothing so we can bail out right away
if (objects.value().contents.empty()) {
co_return std::make_pair(
std::move(objects.value().contents), ss::stop_iteration::yes);
}

// fairly naive approach to caching the token. if the list request
// failed, we leave the cached token empty, but if some other error
// occurs while processing a page, we keep the token and "skip" that
// page. with lexicographically ordered list results and monotonic
// epochs, any eligible keys in a skipped page are guaranteed to appear
// in a subsequent round. given the volume of L0 objects at higher
// throughput rates, we're going to err on the side of making progress
// (vs performing a perfect sweep of outstanding objects).
if (!objects.value().next_continuation_token.empty()) {
continuation_token_.emplace(
std::move(objects.value().next_continuation_token));
}

auto& contents = objects.value().contents;
auto total_objects = contents.size();
auto give_up = ss::stop_iteration::no;
auto [end, _] = std::ranges::remove_if(
contents,
[&prefix_range,
&give_up](const cloud_storage_clients::client::list_bucket_item& o) {
auto obj_prefix = object_path_factory::level_zero_path_to_prefix(
o.key);
// list results are lexicographically ordered, so we should give
// up only once we've exceeded this shards max acceptable prefix
// OR if we hit an invalid key
if (!obj_prefix.has_value()) {
vlog(
cd_log.error,
"Unable to parse prefix during L0 GC: {}",
obj_prefix.error());
give_up = ss::stop_iteration::yes;
return true;
}
if (obj_prefix.value() > prefix_range.max) {
give_up = ss::stop_iteration::yes;
}
return !prefix_range.contains(obj_prefix.value());
});
contents.erase_to_end(end);

vlog(
cd_log.debug,
"RESPONSE SIZE: {} vs ACCEPTED SIZE: {}",
total_objects,
contents.size());

co_return std::make_pair(std::move(contents), give_up);
}

std::unique_ptr<object_storage> storage_;
std::unique_ptr<node_info> node_info_;
level_zero_gc_probe* probe_;
ssx::work_queue worker_;
// TODO: configurable limits?
Expand Down Expand Up @@ -202,7 +270,9 @@ class object_storage_remote_impl : public level_zero_gc::object_storage {
object_path_factory::level_zero_data_dir(),
std::nullopt /*delimiter*/,
std::nullopt /*item_filter*/,
std::nullopt /*max_keys*/,
// TODO(oren): should depend on backend I guess (abs is 5000 max,
// though maybe it's not useful to take that many)
1000 /*max_keys*/,
std::move(continuation_token));
if (res.has_value()) {
co_return std::move(res).assume_value();
Expand Down Expand Up @@ -484,25 +554,58 @@ class epoch_source_impl : public level_zero_gc::epoch_source {
seastar::sharded<cluster::topic_table>* topic_table_;
};

class node_info_impl : public level_zero_gc::node_info {
public:
node_info_impl(
model::node_id self, seastar::sharded<cluster::members_table>* mt)
: self_(self)
, members_table_(mt) {}

// TODO: use uint32 I guess
size_t shard_index() const final {
return shards_up_to(self_) + seastar::this_shard_id();
}
size_t total_shards() const final {
return shards_up_to(model::node_id::max());
}

private:
size_t shards_up_to(model::node_id ub) const {
size_t total{0};
for (const auto& [id, node] : members_table_->local().nodes()) {
if (id < ub) {
total += node.broker.properties().cores;
}
}
return total;
}
model::node_id self_;
seastar::sharded<cluster::members_table>* members_table_;
};

level_zero_gc::level_zero_gc(
level_zero_gc_config config,
std::unique_ptr<object_storage> storage,
std::unique_ptr<epoch_source> epoch_source)
std::unique_ptr<epoch_source> epoch_source,
std::unique_ptr<node_info> node_info)
: config_(std::move(config))
, epoch_source_(std::move(epoch_source))
, should_run_(false) // begin in a stopped state
, should_shutdown_(false)
, worker_(worker())
, probe_(config::shard_local_cfg().disable_metrics())
, delete_worker_(
std::make_unique<list_delete_worker>(std::move(storage), probe_)) {}
std::make_unique<list_delete_worker>(
std::move(storage), std::move(node_info), probe_)) {}

level_zero_gc::level_zero_gc(
model::node_id self,
cloud_io::remote* remote,
cloud_storage_clients::bucket_name bucket,
seastar::sharded<cluster::health_monitor_frontend>* health_monitor,
seastar::sharded<cluster::controller_stm>* controller_stm,
seastar::sharded<cluster::topic_table>* topic_table)
seastar::sharded<cluster::topic_table>* topic_table,
seastar::sharded<cluster::members_table>* members_table)
: level_zero_gc(
level_zero_gc_config{
.deletion_grace_period
Expand All @@ -516,7 +619,8 @@ level_zero_gc::level_zero_gc(
},
std::make_unique<object_storage_remote_impl>(remote, std::move(bucket)),
std::make_unique<epoch_source_impl>(
health_monitor, controller_stm, topic_table)) {}
health_monitor, controller_stm, topic_table),
std::make_unique<node_info_impl>(self, members_table)) {}

level_zero_gc::~level_zero_gc() = default;

Expand Down Expand Up @@ -676,7 +780,7 @@ level_zero_gc::do_try_to_collect(std::optional<cluster_epoch>& max_gc_epoch) {
std::optional<cluster_epoch> last_epoch;
object_id::prefix_t last_prefix{0};

for (const auto& object : candidate_objects.value().contents) {
for (const auto& object : candidate_objects.value()) {
const auto object_epoch = object_path_factory::level_zero_path_to_epoch(
object.key);

Expand Down Expand Up @@ -758,4 +862,44 @@ level_zero_gc::do_try_to_collect(std::optional<cluster_epoch>& max_gc_epoch) {
std::move(eligible_objects), object_keys_total_bytes);
}

bool prefix_range_inclusive::contains(T v) const {
auto res = v >= min && v <= max;
if (!res) {
vlog(cd_log.trace, "PREFIX OUT OF RANGE: {} not in {}", v, *this);
}
return res;
}

prefix_range_inclusive
compute_prefix_range(size_t shard_idx, size_t total_shards) {
// TODO: maybe if shard id exceeds number of prefix we should wrap around?
auto total_prefixes = object_id::prefix_max + 1;
total_shards = std::min(total_shards, static_cast<size_t>(total_prefixes));
if (shard_idx >= total_shards) {
return {.min = 0, .max = 0};
}
auto stride = total_prefixes / total_shards;
auto min = static_cast<object_id::prefix_t>(shard_idx * stride);
auto max = static_cast<object_id::prefix_t>(min + stride - 1);
if (shard_idx == total_shards - 1) {
max = object_id::prefix_max;
}

vassert(min >= 0, "Invalid min prefix: {}", min);
vassert(max <= object_id::prefix_max, "Invalid max prefix: {}", max);

vlog(
cd_log.info,
"STRIDE: {} IDX: {} PREFIXES: [{},{}]\n",
stride,
shard_idx,
min,
max);

return {
.min = min,
.max = max,
};
}

} // namespace cloud_topics
Loading