Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/v/cloud_topics/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ ss::future<> app::wire_up_notifications() {
}
});
});
co_await domain_supervisor.invoke_on_all([this](auto& ds) {
manager.local().on_l1_domain_leader([&ds](
const model::ntp& ntp,
const model::topic_id_partition&,
auto partition) noexcept {
ds.on_domain_leadership_change(ntp, std::move(partition));
});
});
}

ss::future<> app::stop() {
Expand Down
6 changes: 5 additions & 1 deletion src/v/cloud_topics/housekeeper/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,14 @@ void housekeeper_manager::stop_housekeeper(model::topic_id_partition tidp) {
ss::future<> housekeeper_manager::start() { co_return; }

ss::future<> housekeeper_manager::stop() {
vlog(cd_log.info, "stopping cloud_topics::housekeeper_manager");
co_await _queue.shutdown();
for (auto& [_, state] : _state) {
vlog(cd_log.info, "cloud_topics::housekeeper_manager queue stopped");
for (auto& [tidp, state] : _state) {
vlog(cd_log.info, "stopping cloud_topics::housekeeper {}", tidp);
co_await state.housekeeper->stop();
}
vlog(cd_log.info, "successfully stopped cloud_topics::housekeeper");
_state.clear();
}

Expand Down
20 changes: 18 additions & 2 deletions src/v/cloud_topics/level_one/common/object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ class object_builder_impl final : public object_builder {
: _output(std::move(output))
, _opts(opts) {}

~object_builder_impl() override {
vassert(_closed, "L1 object builders must be closed unconditionally");
}
Comment on lines +330 to +332
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using vassert in a destructor can cause program termination during stack unwinding if an exception is already being processed. Consider using a different error handling mechanism like logging an error instead.

Copilot uses AI. Check for mistakes.


ss::future<> start_partition(model::topic_id_partition tidp) final {
end_partition();
co_await serde_write_to_stream(data_type::partition_marker, tidp);
Expand Down Expand Up @@ -393,7 +397,10 @@ class object_builder_impl final : public object_builder {
co_return info;
}

ss::future<> close() final { return _output.close(); }
ss::future<> close() final {
_closed = true;
return _output.close();
}

size_t file_size() const final { return _offset; }

Expand Down Expand Up @@ -444,6 +451,7 @@ class object_builder_impl final : public object_builder {
model::topic_id_partition _current_tidp{};
footer::partition _current_partition;
options _opts;
bool _closed = false;
};

} // namespace
Expand All @@ -460,6 +468,10 @@ class object_reader_impl : public object_reader {
explicit object_reader_impl(ss::input_stream<char> input)
: _input(std::move(input)) {}

~object_reader_impl() override {
vassert(_closed, "L1 object readers must be closed unconditionally");
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using vassert in a destructor can cause program termination during stack unwinding if an exception is already being processed. Consider using a different error handling mechanism like logging an error instead.

Suggested change
vassert(_closed, "L1 object readers must be closed unconditionally");
if (!_closed) {
fmt::print(stderr, "ERROR: L1 object readers must be closed unconditionally\n");
}

Copilot uses AI. Check for mistakes.

}

ss::future<result> read_next() final {
if (_saw_footer) {
// After the footer we have 4 bytes for the size of the footer, so
Expand Down Expand Up @@ -493,7 +505,10 @@ class object_reader_impl : public object_reader {
"unknown data type in object: {}", std::to_underlying(dt)));
}

ss::future<> close() final { return _input.close(); }
ss::future<> close() final {
_closed = true;
return _input.close();
}

private:
template<typename T>
Expand Down Expand Up @@ -538,6 +553,7 @@ class object_reader_impl : public object_reader {

ss::input_stream<char> _input;
bool _saw_footer = false;
bool _closed = false;
};

} // namespace
Expand Down
2 changes: 0 additions & 2 deletions src/v/cloud_topics/level_one/domain/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ redpanda_cc_library(
":domain_manager",
"//src/v/cloud_topics:logger",
"//src/v/cluster",
"//src/v/cluster/utils:partition_change_notifier_api",
"//src/v/cluster/utils:partition_change_notifier_impl",
"//src/v/model",
"//src/v/ssx:when_all",
"//src/v/ssx:work_queue",
Expand Down
71 changes: 23 additions & 48 deletions src/v/cloud_topics/level_one/domain/domain_supervisor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#include "cluster/controller.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"
#include "cluster/utils/partition_change_notifier.h"
#include "cluster/utils/partition_change_notifier_impl.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "ssx/when_all.h"
Expand All @@ -32,11 +30,6 @@ class domain_supervisor::impl {
public:
explicit impl(cluster::controller* controller)
: _controller(controller)
, _partition_notifications(
cluster::partition_change_notifier_impl::make_default(
_controller->get_raft_manager(),
_controller->get_partition_manager(),
_controller->get_topics_state()))
, _queue([](const std::exception_ptr& ex) {
vlog(cd_log.error, "Unexpected domain supervisor error: {}", ex);
}) {}
Expand All @@ -46,36 +39,23 @@ class domain_supervisor::impl {
_as = {};
_loop = do_topic_reconciliation_loop();
}
_partition_notifications_id
= _partition_notifications->register_partition_notifications(
[this](
cluster::partition_change_notifier::notification_type,
const model::ntp& ntp,
std::optional<cluster::partition_change_notifier::partition_state>
partition) {
auto is_l1_topic = ntp.ns == model::kafka_internal_namespace
&& ntp.tp.topic == model::l1_metastore_topic;
if (!is_l1_topic) {
return;
}
// NOTE: listen on all partition notification types.
_queue.submit([this,
ntp = ntp,
partition = std::move(partition)]() mutable {
return reset_domain_manager(
std::move(ntp), std::move(partition));
});
});
co_return;
}

void on_domain_leadership_change(
const model::ntp& ntp,
ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>> partition) {
_queue.submit(
[this, ntp = ntp, partition = std::move(partition)]() mutable {
return reset_domain_manager(std::move(ntp), std::move(partition));
});
}

ss::future<> stop() {
if (ss::this_shard_id() == 0 && _loop) {
_as.request_abort();
co_await *std::exchange(_loop, std::nullopt);
}
_partition_notifications->unregister_partition_notifications(
_partition_notifications_id);
co_await _queue.shutdown();
chunked_vector<ss::future<std::monostate>> stop_futs;
stop_futs.reserve(_domains.size());
Expand Down Expand Up @@ -262,48 +242,38 @@ class domain_supervisor::impl {

ss::future<> reset_domain_manager(
model::ntp ntp,
std::optional<cluster::partition_change_notifier::partition_state>
partition) {
ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>> partition) {
auto dm_id = domain_manager_id{ntp};
auto dm_it = _domains.find(dm_id);
auto dm_exists = dm_it != _domains.end();
if (dm_exists) {
if (partition) {
// We already have a domain manager, there is nothing to do.
co_return;
}
auto dm = std::move(dm_it->second);
_domains.erase(dm_it);
auto stop_fut = co_await ss::coroutine::as_future(
dm->stop_and_wait());
if (stop_fut.failed()) {
auto ex = stop_fut.get_exception();
vlog(
cd_log.error,
"Removing domain manager {} failed: {}",
dm_id,
stop_fut.get_exception());
ex);
}
co_return;
}
auto requires_domain_mgr = partition && partition->is_leader;
if (!requires_domain_mgr) {
co_return;
}
auto& pm = _controller->get_partition_manager().local();
auto partition_ptr = pm.get(ntp);
if (!partition_ptr) {
if (!partition) {
co_return;
}
auto domain_mgr = ss::make_lw_shared<domain_manager>(
partition_ptr->raft()->stm_manager()->get<simple_stm>());
(*partition)->raft()->stm_manager()->get<simple_stm>());
_domains.emplace(dm_id, std::move(domain_mgr));
}

cluster::controller* _controller;

// Notification hook to start domain managers on leaders of the L1
// metastore topic partitions.
std::unique_ptr<cluster::partition_change_notifier>
_partition_notifications;
cluster::notification_id_type _partition_notifications_id
= cluster::notification_id_type_invalid;

// Queue to process async work associated with starting and stopping domain
// managers when handling partition notifications.
ssx::work_queue _queue;
Expand Down Expand Up @@ -336,4 +306,9 @@ ss::future<bool> domain_supervisor::maybe_create_metastore_topic() {
return _impl->maybe_create_metastore_topic();
}

void domain_supervisor::on_domain_leadership_change(
const model::ntp& ntp,
ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>> partition) {
_impl->on_domain_leadership_change(ntp, std::move(partition));
}
} // namespace cloud_topics::l1
12 changes: 11 additions & 1 deletion src/v/cloud_topics/level_one/domain/domain_supervisor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#include "model/fundamental.h"

#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/util/optimized_optional.hh>

namespace cluster {
class controller;
}
class partition;
} // namespace cluster

namespace cloud_topics::l1 {
class domain_manager;
Expand All @@ -37,6 +40,13 @@ class domain_supervisor {
ss::future<> start();
ss::future<> stop();

// This is expected to be called when leadership for a L1 domain is called
// to notify the domain_supervisor if there is a manager that needs to be
// created (or removed if partition is nullptr).
void on_domain_leadership_change(
const model::ntp&,
ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>>);

// Returns nullopt if the domain manager for the given L1 metastore NTP, if
// one exists (e.g. if it is currently leader and has processed the
// appropriate leadership notification).
Expand Down
55 changes: 35 additions & 20 deletions src/v/cloud_topics/manager/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,37 @@ ss::future<> cloud_topics_manager::start() {
return topic_table_->local().get_topic_cfg(
{ntp.ns, ntp.tp.topic});
});
if (!config) {
vlog(
cd_log.warn, "unable to find topic configuration for {}", ntp);
if (!config || !config->tp_id) {
auto it = topic_id_mapping_.find(ntp);
if (it == topic_id_mapping_.end()) {
// This can happen if a topic is deleted and it wasn't a cloud
// topic, so keep logging below INFO.
vlog(cd_log.debug, "unable to find topic ID for {}", ntp);
return;
}
// Always emit these even if they are not cloud topics (we can't
// know), because it means the topic was deleted.
model::topic_id_partition tidp{it->second, ntp.tp.partition};
topic_id_mapping_.erase(it);
on_leadership_change(ntp, tidp, /*is_leader=*/false);
return;
}
on_leadership_change(ntp, is_leader, *config);
if (
!config->properties.cloud_topic_enabled
&& model::topic_namespace_view(ntp) != model::l1_metastore_nt) {
return;
}
if (is_leader) {
// Always ensure that if there is a leadership notification
// emitted, that we also emit a no leader notification, even if
// the topic is deleted and we no longer have the topic ID.
topic_id_mapping_.try_emplace(ntp, config->tp_id.value());
} else {
topic_id_mapping_.erase(ntp);
}
model::topic_id_partition tidp{
config->tp_id.value(), ntp.tp.partition};
on_leadership_change(ntp, tidp, is_leader);
},
notify_current_state::yes);
co_return;
Expand All @@ -87,28 +112,18 @@ ss::future<> cloud_topics_manager::stop() {

void cloud_topics_manager::on_leadership_change(
const model::ntp& ntp,
bool is_leader,
const cluster::topic_configuration& config) noexcept {
const model::topic_id_partition& tidp,
bool is_leader) noexcept {
ss::optimized_optional<ss::lw_shared_ptr<cluster::partition>> partition;
if (is_leader) {
partition = partition_manager_->local().get(ntp);
}
if (config.properties.cloud_topic_enabled) {
if (!config.tp_id) {
vlog(cd_log.warn, "missing topic ID for cloud topic: {}", ntp);
return;
}
model::topic_id_partition tidp{*config.tp_id, ntp.tp.partition};
for (const auto& cb : ctp_callbacks_) {
if (model::l1_metastore_nt == model::topic_namespace_view(ntp)) {
for (const auto& cb : l1_callbacks_) {
cb(ntp, tidp, partition);
}
} else if (model::l1_metastore_nt == model::topic_namespace_view(ntp)) {
if (!config.tp_id) {
vlog(cd_log.warn, "missing topic ID for metastore domain: {}", ntp);
return;
}
model::topic_id_partition tidp{*config.tp_id, ntp.tp.partition};
for (const auto& cb : l1_callbacks_) {
} else {
for (const auto& cb : ctp_callbacks_) {
cb(ntp, tidp, partition);
}
}
Expand Down
15 changes: 13 additions & 2 deletions src/v/cloud_topics/manager/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include "cluster/topic_configuration.h"
#include "cluster/utils/partition_change_notifier.h"
#include "model/fundamental.h"
#include "model/ktp.h"
#include "model/metadata.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -103,15 +105,24 @@ class cloud_topics_manager {
private:
void on_leadership_change(
const model::ntp& ntp,
bool is_leader,
const cluster::topic_configuration&) noexcept;
const model::topic_id_partition& tidp,
bool is_leader) noexcept;

ss::sharded<cluster::partition_manager>* partition_manager_;
ss::sharded<cluster::topic_table>* topic_table_;
std::unique_ptr<cluster::partition_change_notifier> notifier_;
std::vector<notification_cb_t> ctp_callbacks_;
std::vector<notification_cb_t> l1_callbacks_;
std::optional<cluster::notification_id_type> notification_;
// In the case of a topic being deleted, we no longer have the
// topic_id_mapping_, but we need to still emit a notification with it.
//
// Fix this by keeping an explicit mapping and looking it up if we can't
// find it.
//
// We have to key this by ntp and not just ns_tp because we want to GC
// entries over time.
model::ntp_map_type<model::topic_id> topic_id_mapping_;
};

} // namespace cloud_topics
Loading
Loading