Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,29 @@ redpanda_cc_library(
"@seastar",
],
)

redpanda_cc_library(
name = "stm",
srcs = [
"stm.cc",
],
hdrs = [
"stm.h",
],
implementation_deps = [
"//src/v/cloud_topics:logger",
"//src/v/serde",
"//src/v/ssx:future_util",
"//src/v/utils:prefix_logger",
],
visibility = ["//visibility:public"],
deps = [
":lsm_update",
":state",
"//src/v/cluster:state_machine_registry",
"//src/v/config",
"//src/v/model",
"//src/v/raft",
"@seastar",
],
)
7 changes: 7 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,11 @@ struct lsm_state
std::optional<serialized_manifest> persisted_manifest;
};

struct lsm_stm_snapshot
: public serde::
envelope<lsm_stm_snapshot, serde::version<0>, serde::compat_version<0>> {
auto serde_fields() { return std::tie(state); }
lsm_state state;
};

} // namespace cloud_topics::l1
256 changes: 256 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/stm.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_topics/level_one/metastore/lsm/stm.h"

#include "cloud_topics/level_one/metastore/lsm/lsm_update.h"
#include "cloud_topics/level_one/metastore/lsm/state.h"
#include "cloud_topics/logger.h"
#include "model/record_batch_types.h"
#include "serde/async.h"
#include "ssx/future-util.h"
#include "utils/prefix_logger.h"

#include <seastar/coroutine/as_future.hh>

#include <optional>

namespace cloud_topics::l1 {

namespace {
template<typename Res>
void maybe_log_update_error(
prefix_logger& log,
lsm_update_key key,
model::offset o,
const std::expected<Res, lsm_update_error>& r) {
if (r.has_value()) {
return;
}
// NOTE: inability to update the STM is not necessarily a bug! It indicates
// that this update's construction raced with another update that broke an
// invariant required to apply update. Expectation is that this update's
// caller constructs a new update and tries again if needed.
vlog(
log.debug,
"L1 LSM STM {} update at offset {} didn't apply: {}",
key,
o,
r.error());
}
} // namespace

stm::stm(
ss::logger& logger,
raft::consensus* raft,
config::binding<std::chrono::seconds> snapshot_delay)
: metastore_stm_base("l1_lsm_stm.snapshot", logger, raft)
, snapshot_delay_secs_(std::move(snapshot_delay)) {
snapshot_timer_.set_callback([this] { write_snapshot_async(); });
}

ss::future<std::expected<model::term_id, stm::errc>>
stm::sync(model::timeout_clock::duration timeout) {
auto sync_res = co_await ss::coroutine::as_future(
metastore_stm_base::sync(timeout));
if (sync_res.failed()) {
auto eptr = sync_res.get_exception();
auto msg = fmt::format("Exception caught while syncing: {}", eptr);
if (ssx::is_shutdown_exception(eptr)) {
vlog(cd_log.debug, "Ignoring shutdown error: {}", msg);
co_return std::unexpected(errc::shutting_down);
}
vlog(cd_log.warn, "{}", msg);
co_return std::unexpected(errc::raft_error);
}

if (!sync_res.get()) {
co_return std::unexpected(errc::not_leader);
}
// At this point we're guaranteed that this node is the leader and that the
// STM has been caught up in the term before this one (this doesn't mean
// the STM is currently caught up right now though, e.g. if there are
// in-flight updates from this term!)
co_return _insync_term;
}

ss::future<std::expected<model::offset, stm::errc>> stm::replicate_and_wait(
model::term_id term, model::record_batch batch, ss::abort_source& as) {
constexpr auto replicate_timeout = 10s;
auto opts = raft::replicate_options(
raft::consistency_level::quorum_ack,
term,
replicate_timeout,
std::ref(as));
opts.set_force_flush();
const auto offsets_delta = batch.header().last_offset_delta;
const auto res = co_await _raft->replicate(std::move(batch), opts);
if (res.has_error()) {
co_return std::unexpected(errc::raft_error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an opportunity for the concurrency problems to emerge. I think that here we should either use indefinite timeout or step down the leadership in case of error. Otherwise it will be triggering a race condition on timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah generally I agree we need to be cautious about handling failures. I left some thoughts here #29160 (comment) (TLDR either callers retry, or stepdown like you mention)

}
auto last_offset = res.value().last_offset;
auto base_offset = model::offset(last_offset() - offsets_delta);
auto replicated_offset = res.value().last_offset;
auto success = co_await wait_no_throw(
replicated_offset, ss::lowres_clock::now() + 30s, as);
if (!success || _raft->term() != term) {
vlog(
cd_log.debug,
"Waiting for apply of batch [{}, {}] failed",
base_offset,
last_offset);
co_return std::unexpected(errc::raft_error);
}
co_return base_offset;
}

ss::future<> stm::do_apply(const model::record_batch& batch) {
if (batch.header().type != model::record_batch_type::l1_stm) {
co_return;
}

auto iter = model::record_batch_iterator::create(batch);
while (iter.has_next()) {
auto r = iter.next();
auto key_buf = r.release_key();
if (key_buf.size_bytes() == 0) {
continue;
}

iobuf_parser key_parser(std::move(key_buf));
auto key = serde::read<lsm_update_key>(key_parser);

iobuf value_buf = r.release_value();
iobuf_parser value_parser(std::move(value_buf));

auto o = batch.base_offset() + model::offset_delta{r.offset_delta()};
switch (key) {
case lsm_update_key::apply_write_batch: {
auto update = serde::read<apply_write_batch_update>(value_parser);
auto result = update.apply(state_, o);
Comment on lines +136 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is volatile rows consumed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted on slack, it's in the next PR in replicated_database

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might fail if the seq.number is incorrect. How the caller (the code that replicated the batch) will know that this happened?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fair to make this a vassert. This is an internal invariant maintained by the state machine by virtue of being called with monotonically increasing offsets

maybe_log_update_error(_log, key, o, result);
break;
}
case lsm_update_key::persist_manifest: {
auto update = co_await serde::read_async<persist_manifest_update>(
value_parser);
auto result = update.apply(state_);
maybe_log_update_error(_log, key, o, result);
break;
}
case lsm_update_key::set_domain_uuid: {
auto update = serde::read<set_domain_uuid_update>(value_parser);
auto result = update.apply(state_);
maybe_log_update_error(_log, key, o, result);
break;
}
case lsm_update_key::reset_manifest: {
auto update = co_await serde::read_async<reset_manifest_update>(
value_parser);
auto result = update.apply(state_, batch.term(), o);
maybe_log_update_error(_log, key, o, result);
break;
}
}
}

rearm_snapshot_timer();
}

model::offset stm::max_removable_local_log_offset() { return {}; }

ss::future<raft::local_snapshot_applied>
stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&& snapshot_buf) {
auto parser = iobuf_parser(std::move(snapshot_buf));
auto snapshot = co_await serde::read_async<lsm_state>(parser);
state_ = std::move(snapshot);
co_return raft::local_snapshot_applied::yes;
}

ss::future<raft::stm_snapshot>
stm::take_local_snapshot(ssx::semaphore_units units) {
auto snapshot_offset = last_applied_offset();
auto snapshot = co_await make_snapshot();
units.return_all();
iobuf snapshot_buf;
co_await serde::write_async(snapshot_buf, std::move(snapshot));
co_return raft::stm_snapshot::create(
0, snapshot_offset, std::move(snapshot_buf));
}

ss::future<> stm::apply_raft_snapshot(const iobuf& snapshot_buf) {
auto parser = iobuf_parser(snapshot_buf.copy());
auto snapshot = co_await serde::read_async<lsm_state>(parser);
state_ = std::move(snapshot);
}

ss::future<iobuf> stm::take_raft_snapshot() {
iobuf snapshot_buf;
co_await serde::write_async(snapshot_buf, co_await make_snapshot());
co_return std::move(snapshot_buf);
}

ss::future<lsm_stm_snapshot> stm::make_snapshot() const {
lsm_stm_snapshot snapshot;
snapshot.state = state_.copy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever a concern for stalls here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question -- certainly at some point, but I'm not sure. I will need to measure a bit.

co_return snapshot;
}

ss::future<> stm::stop() {
snapshot_timer_.cancel();
co_await metastore_stm_base::stop();
}

ss::future<> stm::maybe_write_snapshot() {
if (_raft->last_snapshot_index() >= last_applied()) {
co_return;
}
auto snapshot = co_await _raft->stm_manager()->take_snapshot();
vlog(
_log.debug,
"creating snapshot at offset: {}",
snapshot.last_included_offset);
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(
snapshot.last_included_offset, std::move(snapshot.data)));
}

void stm::write_snapshot_async() {
ssx::background = ssx::spawn_with_gate_then(
_gate, [this] { return maybe_write_snapshot(); })
.handle_exception([this](const std::exception_ptr& e) {
vlog(_log.warn, "failed to write snapshot: {}", e);
})
.finally([holder = _gate.hold()] {});
}

void stm::rearm_snapshot_timer() {
if (_gate.is_closed() || snapshot_timer_.armed()) {
return;
}
snapshot_timer_.arm(ss::lowres_clock::now() + snapshot_delay_secs_());
}

bool lsm_stm_factory::is_applicable_for(
const storage::ntp_config& ntp_cfg) const {
const auto& ntp = ntp_cfg.ntp();
return (ntp.ns == model::kafka_internal_namespace)
&& (ntp.tp.topic == model::l1_metastore_topic);
}

void lsm_stm_factory::create(
raft::state_machine_manager_builder& builder,
raft::consensus* raft,
const cluster::stm_instance_config&) {
auto s = builder.create_stm<stm>(cd_log, raft, config::mock_binding(10s));
raft->log()->stm_manager()->add_stm(std::move(s));
}

} // namespace cloud_topics::l1
100 changes: 100 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/stm.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "cloud_topics/level_one/metastore/lsm/state.h"
#include "cluster/state_machine_registry.h"
#include "model/fundamental.h"
#include "raft/persisted_stm.h"

namespace cloud_topics::l1 {

using metastore_stm_base = raft::persisted_stm_no_snapshot_at_offset<>;

// A replicated state machine that backs an LSM database on object storage.
// This state machine applies optimistic concurrency control to ensure the
// updates go to proper database. Any additional concurrency control (e.g.
// locking to ensure application integrity) is left to callers.
class stm final : public metastore_stm_base {
public:
static constexpr std::string_view name = "l1_lsm_stm";
enum class errc {
not_leader,
raft_error,
shutting_down,
};

explicit stm(
ss::logger&, raft::consensus*, config::binding<std::chrono::seconds>);
raft::consensus* raft() { return _raft; }

// Syncs the STM such that we're guaranteed that it has applied all records
// from the previous terms. Calling does _not_ ensure that all records from
// the current term have been applied. But it does establish for new
// leaders that they are up-to-date.
//
// Returns the current term.
ss::future<std::expected<model::term_id, errc>>
sync(model::timeout_clock::duration timeout);

// Replicates the given batch and waits for it to finish replicating.
// Success here does not guarantee that the replicated operation succeeded
// in updating the STM -- only that the apply was attempted.
ss::future<std::expected<model::offset, errc>> replicate_and_wait(
model::term_id, model::record_batch batch, ss::abort_source&);

const lsm_state& state() const { return state_; }

raft::stm_initial_recovery_policy
get_initial_recovery_policy() const final {
return raft::stm_initial_recovery_policy::read_everything;
}

protected:
ss::future<> stop() override;

ss::future<lsm_stm_snapshot> make_snapshot() const;

ss::future<> do_apply(const model::record_batch&) override;

model::offset max_removable_local_log_offset() override;

ss::future<raft::local_snapshot_applied>
apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override;

ss::future<raft::stm_snapshot>
take_local_snapshot(ssx::semaphore_units) override;

ss::future<> apply_raft_snapshot(const iobuf&) final;

ss::future<iobuf> take_raft_snapshot() final;

private:
void rearm_snapshot_timer();
void write_snapshot_async();
ss::future<> maybe_write_snapshot();

// The deterministic state managed by this STM.
lsm_state state_;
config::binding<std::chrono::seconds> snapshot_delay_secs_;
ss::timer<ss::lowres_clock> snapshot_timer_;
};

class lsm_stm_factory : public cluster::state_machine_factory {
public:
lsm_stm_factory() = default;
bool is_applicable_for(const storage::ntp_config&) const final;
void create(
raft::state_machine_manager_builder&,
raft::consensus*,
const cluster::stm_instance_config&) final;
};

} // namespace cloud_topics::l1