diff --git a/src/v/cloud_topics/level_one/metastore/BUILD b/src/v/cloud_topics/level_one/metastore/BUILD index eed26450e0aee..39b087d79185e 100644 --- a/src/v/cloud_topics/level_one/metastore/BUILD +++ b/src/v/cloud_topics/level_one/metastore/BUILD @@ -8,6 +8,18 @@ redpanda_cc_rpc_library( src = "rpc.json", ) +redpanda_cc_library( + name = "domain_uuid", + hdrs = [ + "domain_uuid.h", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/utils:named_type", + "//src/v/utils:uuid", + ], +) + redpanda_cc_library( name = "garbage_collector", srcs = [ diff --git a/src/v/cloud_topics/level_one/metastore/domain_uuid.h b/src/v/cloud_topics/level_one/metastore/domain_uuid.h new file mode 100644 index 0000000000000..8833d47635263 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/domain_uuid.h @@ -0,0 +1,19 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "utils/named_type.h" +#include "utils/uuid.h" + +namespace cloud_topics::l1 { + +using domain_uuid = named_type; + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/BUILD new file mode 100644 index 0000000000000..3d1622dac86b7 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/BUILD @@ -0,0 +1,99 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +redpanda_cc_library( + name = "write_batch_row", + hdrs = [ + "write_batch_row.h", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/bytes:iobuf", + "//src/v/serde", + "@seastar", + ], +) + +redpanda_cc_library( + name = "state", + srcs = [ + "state.cc", + ], + hdrs = [ + "state.h", + ], + implementation_deps = [ + "//src/v/serde:iobuf", + "//src/v/serde:named_type", + "//src/v/serde:optional", + "//src/v/serde:sstring", + "//src/v/serde:uuid", + "//src/v/serde:vector", + ], + visibility = ["//visibility:public"], + deps = [ + ":write_batch_row", + "//src/v/base", + "//src/v/bytes:iobuf", + "//src/v/bytes:iobuf_parser", + "//src/v/cloud_topics/level_one/metastore:domain_uuid", + "//src/v/container:chunked_vector", + "//src/v/lsm", + "//src/v/model", + "//src/v/serde", + "@seastar", + ], +) + +redpanda_cc_library( + name = "lsm_update", + srcs = [ + "lsm_update.cc", + ], + hdrs = [ + "lsm_update.h", + ], + implementation_deps = [ + "//src/v/cloud_topics/level_one/metastore:domain_uuid", + "//src/v/serde:iobuf", + "//src/v/serde:named_type", + "//src/v/serde:optional", + "//src/v/serde:uuid", + ], + visibility = ["//visibility:public"], + deps = [ + ":state", + ":write_batch_row", + "//src/v/base", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/serde", + "//src/v/utils:named_type", + "@seastar", + ], +) + +redpanda_cc_library( + name = "stm", + srcs = [ + "stm.cc", + ], + hdrs = [ + "stm.h", + ], + implementation_deps = [ + "//src/v/cloud_topics:logger", + "//src/v/serde", + "//src/v/ssx:future_util", + "//src/v/utils:prefix_logger", + ], + visibility = ["//visibility:public"], + deps = [ + ":lsm_update", + ":state", + "//src/v/cluster:state_machine_registry", + "//src/v/config", + "//src/v/model", + "//src/v/raft", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc new file mode 100644 index 0000000000000..532d2802d5b9d --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc @@ -0,0 +1,253 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/level_one/metastore/lsm/lsm_update.h" + +#include "cloud_topics/level_one/metastore/domain_uuid.h" +#include "model/fundamental.h" +#include "serde/rw/iobuf.h" +#include "serde/rw/named_type.h" +#include "serde/rw/optional.h" +#include "serde/rw/uuid.h" + +#include + +namespace cloud_topics::l1 { + +std::expected +apply_write_batch_update::build( + const lsm_state& state, + domain_uuid expected_uuid, + chunked_vector rows) { + apply_write_batch_update update{ + .expected_uuid = expected_uuid, + .rows = std::move(rows), + }; + auto allowed = update.can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + return update; +} + +std::expected +apply_write_batch_update::can_apply(const lsm_state& state) const { + if (state.domain_uuid != expected_uuid) { + return std::unexpected( + lsm_update_error{fmt::format( + "Expected domain UUID: {}, actual: {}", + expected_uuid, + state.domain_uuid)}); + } + if (rows.empty()) { + return std::unexpected(lsm_update_error{"Write batch is empty"}); + } + return std::monostate{}; +} + +std::expected +apply_write_batch_update::apply(lsm_state& state, model::offset base_offset) { + auto allowed = can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + lsm::sequence_number seqno(base_offset() + state.seqno_delta); + for (auto& row : rows) { + state.volatile_buffer.push_back( + volatile_row{.seqno = seqno, .row = std::move(row)}); + } + + return std::monostate{}; +} + +apply_write_batch_update apply_write_batch_update::copy() const { + apply_write_batch_update ret{ + .expected_uuid = expected_uuid, + }; + for (const auto& r : rows) { + ret.rows.push_back( + write_batch_row{ + .key = r.key, + .value = r.value.copy(), + }); + } + return ret; +} + +std::expected +persist_manifest_update::build( + const lsm_state& state, + domain_uuid expected_uuid, + lsm_state::serialized_manifest manifest) { + persist_manifest_update update{ + .expected_uuid = expected_uuid, + .manifest = std::move(manifest), + }; + auto allowed = update.can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + return update; +} + +std::expected +persist_manifest_update::can_apply(const lsm_state& state) const { + if (state.domain_uuid != expected_uuid) { + return std::unexpected( + lsm_update_error{fmt::format( + "Expected domain UUID: {}, actual: {}", + expected_uuid, + state.domain_uuid)}); + } + if (!state.persisted_manifest.has_value()) { + return std::monostate{}; + } + // Epoch and seqno should be monotonically increasing, so only accept them + // if they go up. + if ( + manifest.get_last_seqno() < state.persisted_manifest->get_last_seqno()) { + return std::unexpected( + lsm_update_error{fmt::format( + "Cannot persist manifest up to seqno {} which is below current " + "seqno {}", + manifest.get_last_seqno(), + state.persisted_manifest->get_last_seqno())}); + } + if ( + manifest.get_database_epoch() + < state.persisted_manifest->get_database_epoch()) { + return std::unexpected( + lsm_update_error{fmt::format( + "Cannot persist manifest from epoch {} which is below current " + "epoch {}", + manifest.get_database_epoch(), + state.persisted_manifest->get_database_epoch())}); + } + + return std::monostate{}; +} + +std::expected +persist_manifest_update::apply(lsm_state& state) { + auto allowed = can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + + while (!state.volatile_buffer.empty() + && state.volatile_buffer.front().seqno + <= manifest.get_last_seqno()) { + state.volatile_buffer.pop_front(); + } + state.persisted_manifest = std::move(manifest); + return std::monostate{}; +} + +std::expected +set_domain_uuid_update::build(const lsm_state& state, domain_uuid uuid) { + set_domain_uuid_update update{ + .uuid = uuid, + }; + auto allowed = update.can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + return update; +} + +std::expected +set_domain_uuid_update::can_apply(const lsm_state& state) const { + if (state.domain_uuid != domain_uuid{}) { + return std::unexpected( + lsm_update_error{fmt::format( + "Cannot set domain UUID: already set to {}", state.domain_uuid)}); + } + return std::monostate{}; +} + +std::expected +set_domain_uuid_update::apply(lsm_state& state) { + auto allowed = can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + + state.domain_uuid = uuid; + + return std::monostate{}; +} + +std::expected +reset_manifest_update::build( + const lsm_state& state, + domain_uuid uuid, + std::optional new_manifest) { + reset_manifest_update update{ + .new_uuid = uuid, + .new_manifest = std::move(new_manifest), + }; + auto allowed = update.can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + return update; +} + +std::expected +reset_manifest_update::can_apply(const lsm_state& state) const { + if (!state.volatile_buffer.empty()) { + return std::unexpected( + lsm_update_error{fmt::format( + "Cannot reset manifest: STM already has {} rows", + state.volatile_buffer.size())}); + } + if ( + state.persisted_manifest.has_value() + && state.persisted_manifest->last_seqno > lsm::sequence_number(0)) { + const auto& cur_manifest = *state.persisted_manifest; + return std::unexpected( + lsm_update_error{fmt::format( + "Cannot reset manifest: STM already has persisted manifest, epoch: " + "{}, seqno: {}", + cur_manifest.get_database_epoch(), + cur_manifest.get_last_seqno())}); + } + return std::monostate{}; +} + +std::expected reset_manifest_update::apply( + lsm_state& state, model::term_id t, model::offset o) { + auto allowed = can_apply(state); + if (!allowed.has_value()) { + return std::unexpected(allowed.error()); + } + dassert( + state.volatile_buffer.empty(), + "Expected volatile buffer to be empty: {}", + state.volatile_buffer.size()); + + state.domain_uuid = new_uuid; + int64_t persisted_seqno = new_manifest ? new_manifest->get_last_seqno() + : lsm::sequence_number(0); + int64_t persisted_epoch = new_manifest ? new_manifest->get_database_epoch() + : 0; + state.persisted_manifest = std::move(new_manifest); + + // Reset the deltas. These are set based on the persisted values, + // with the assumption that the STM is being initialized near raft offset 0. + // The deltas ensure that future raft offsets map correctly to database + // seqnos/epochs. + state.seqno_delta = persisted_seqno - o(); + state.db_epoch_delta = persisted_epoch - t(); + + return std::monostate{}; +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h new file mode 100644 index 0000000000000..6b2a7f8890ec8 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h @@ -0,0 +1,149 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "base/seastarx.h" +#include "cloud_topics/level_one/metastore/domain_uuid.h" +#include "cloud_topics/level_one/metastore/lsm/state.h" +#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" +#include "container/chunked_vector.h" +#include "model/fundamental.h" +#include "serde/envelope.h" +#include "utils/named_type.h" + +#include + +namespace cloud_topics::l1 { + +enum class lsm_update_key : uint8_t { + apply_write_batch = 0, + persist_manifest = 1, + set_domain_uuid = 2, + reset_manifest = 3, +}; + +using lsm_update_error = named_type; + +// Adds the given rows to the volatile buffer, assigning each row sequence +// numbers upon applying. +struct apply_write_batch_update + : public serde::envelope< + apply_write_batch_update, + serde::version<0>, + serde::compat_version<0>> { + friend bool + operator==(const apply_write_batch_update&, const apply_write_batch_update&) + = default; + auto serde_fields() { return std::tie(expected_uuid, rows); } + + static std::expected build( + const lsm_state&, + domain_uuid expected_uuid, + chunked_vector rows); + std::expected + can_apply(const lsm_state&) const; + std::expected + apply(lsm_state&, model::offset); + + apply_write_batch_update copy() const; + + domain_uuid expected_uuid; + chunked_vector rows; +}; + +// Sets the persisted_manifest, with the expectation that the manifest has been +// persisted to object storage. +struct persist_manifest_update + : public serde::envelope< + persist_manifest_update, + serde::version<0>, + serde::compat_version<0>> { + auto serde_fields() { return std::tie(expected_uuid, manifest); } + friend bool + operator==(const persist_manifest_update&, const persist_manifest_update&) + = default; + + static std::expected build( + const lsm_state&, domain_uuid, lsm_state::serialized_manifest manifest); + std::expected + can_apply(const lsm_state&) const; + std::expected apply(lsm_state&); + + domain_uuid expected_uuid; + lsm_state::serialized_manifest manifest; +}; + +// Sets the domain UUID. This should be the first operation performed to the +// STM, to ensure all replicas are operating on the same paths in object +// storage. +struct set_domain_uuid_update + : public serde::envelope< + set_domain_uuid_update, + serde::version<0>, + serde::compat_version<0>> { + friend bool + operator==(const set_domain_uuid_update&, const set_domain_uuid_update&) + = default; + auto serde_fields() { return std::tie(uuid); } + + static std::expected + build(const lsm_state&, domain_uuid); + std::expected + can_apply(const lsm_state&) const; + std::expected apply(lsm_state&); + + domain_uuid uuid; +}; + +// Resets the manifest to match the contents of the given serialized manifest +// (expected to be a manifest stored in shared object storage), and updates the +// domain UUID accordingly. This is meant to be used as the basis for recovery. +struct reset_manifest_update + : public serde::envelope< + reset_manifest_update, + serde::version<0>, + serde::compat_version<0>> { + auto serde_fields() { return std::tie(new_uuid, new_manifest); } + friend bool + operator==(const reset_manifest_update&, const reset_manifest_update&) + = default; + + static std::expected build( + const lsm_state&, + domain_uuid, + std::optional); + std::expected + can_apply(const lsm_state&) const; + std::expected + apply(lsm_state&, model::term_id, model::offset); + + domain_uuid new_uuid; + std::optional new_manifest; +}; +} // namespace cloud_topics::l1 + +template<> +struct fmt::formatter final + : fmt::formatter { + template + auto format( + const cloud_topics::l1::lsm_update_key& k, FormatContext& ctx) const { + switch (k) { + case cloud_topics::l1::lsm_update_key::apply_write_batch: + return formatter::format("apply_write_batch", ctx); + case cloud_topics::l1::lsm_update_key::persist_manifest: + return formatter::format("persist_manifest", ctx); + case cloud_topics::l1::lsm_update_key::set_domain_uuid: + return formatter::format("set_domain_uuid", ctx); + case cloud_topics::l1::lsm_update_key::reset_manifest: + return formatter::format("reset_manifest", ctx); + } + } +}; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.cc b/src/v/cloud_topics/level_one/metastore/lsm/state.cc new file mode 100644 index 0000000000000..ea508ac7e2dbc --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.cc @@ -0,0 +1,60 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#include "cloud_topics/level_one/metastore/lsm/state.h" + +#include "serde/rw/envelope.h" +#include "serde/rw/iobuf.h" +#include "serde/rw/named_type.h" +#include "serde/rw/optional.h" +#include "serde/rw/sstring.h" +#include "serde/rw/uuid.h" +#include "serde/rw/vector.h" + +#include + +namespace cloud_topics::l1 { + +namespace { +std::deque copy_rows(const std::deque& rows) { + std::deque copy; + for (const auto& r : rows) { + copy.push_back( + volatile_row{ + .seqno = r.seqno, + .row = write_batch_row{ + .key = r.row.key, .value = r.row.value.copy()}}); + } + return copy; +} +} // namespace + +lsm_state::serialized_manifest lsm_state::serialized_manifest::copy() const { + return serialized_manifest{ + .buf = buf.copy(), + .last_seqno = last_seqno, + .database_epoch = database_epoch, + }; +} + +lsm_state lsm_state::copy() const { + std::optional manifest_copy; + if (persisted_manifest.has_value()) { + manifest_copy = persisted_manifest->copy(); + } + return lsm_state{ + .domain_uuid = domain_uuid, + .seqno_delta = seqno_delta, + .db_epoch_delta = db_epoch_delta, + .volatile_buffer = copy_rows(volatile_buffer), + .persisted_manifest = std::move(manifest_copy), + }; +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.h b/src/v/cloud_topics/level_one/metastore/lsm/state.h new file mode 100644 index 0000000000000..e64335715b3f2 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.h @@ -0,0 +1,127 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "bytes/iobuf.h" +#include "cloud_topics/level_one/metastore/domain_uuid.h" +#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" +#include "lsm/lsm.h" +#include "serde/envelope.h" + +#include + +namespace cloud_topics::l1 { + +struct volatile_row + : public serde:: + envelope, serde::compat_version<0>> { + friend bool operator==(const volatile_row&, const volatile_row&) = default; + auto serde_fields() { return std::tie(seqno, row); } + lsm::sequence_number seqno; + write_batch_row row; + + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to(it, "{{seqno: {}, row: {}}}", seqno, row); + } +}; + +// State that backs a replicated database on object storage. This comprises of: +// - A UUID that uniquely identifies the database state, to ensure we only +// accept operations meant for the correct database. This gets updated when +// the state is ever restored from a manifest. +// - Deltas for the offset and term to be applied to seqno and epochs to ensure +// monotonicity for the underlying database. These may be non-zero after +// restoring a manifest. +// - A manifest that has been uploaded to object storage from which all +// replicas will be able to open a database. +// - A list of writes that need to be replayed on top of the manifest in order +// to be caught up. This list can be thought of as a write-ahead log for the +// database. +struct lsm_state + : public serde:: + envelope, serde::compat_version<0>> { + friend bool operator==(const lsm_state&, const lsm_state&) = default; + auto serde_fields() { + return std::tie( + domain_uuid, + seqno_delta, + db_epoch_delta, + volatile_buffer, + persisted_manifest); + } + lsm_state copy() const; + + // The unique identifier for this LSM state. This should be used as the + // basis for where the database writes its data and metadata. + // + // Must be set before any writes are accepted. May be reset if recovering + // from an existing manifest that is written with a different domain path + // (e.g. when recovering state from cloud). + domain_uuid domain_uuid{}; + + // Difference between Raft offset/term and database seqno/epoch. These will + // be non-zero upon recovery from object storage, since the restored + // seqno/epoch will not start at zero in that case. These may also be + // negative, in case recovery was performed on a non-empty Raft log. + // + // seqno = offset + seqno_delta + // epoch = term + epoch_delta + // + // TODO: use named types to enforce correct arithmetic rules. + int64_t seqno_delta{0}; + int64_t db_epoch_delta{0}; + + // Rows that aren't persisted to object storage yet but should be applied + // to the database. When opening a database from the persisted manifest, + // these rows must be applied to the opened database to catch it up. + std::deque volatile_buffer; + + // State that is persisted to object storage. This state contains write + // operations up to a given sequence number; operations from below that + // sequence number can be removed from the volatile buffer and no longer + // need to be replayed to the database when opening the database from this + // state. + // + // TODO: we store the serialized manifest because it allows us to copy + // synchronously without adding locks. If we want to maintain the manifest + // as raw proto, we will need to add a synchronous copy to protobuf, or add + // a lock around the manifest to safely async copy it. + struct serialized_manifest + : public serde::envelope< + serialized_manifest, + serde::version<0>, + serde::compat_version<0>> { + auto serde_fields() { + return std::tie(buf, last_seqno, database_epoch); + } + friend bool + operator==(const serialized_manifest&, const serialized_manifest&) + = default; + serialized_manifest copy() const; + lsm::sequence_number get_last_seqno() const { return last_seqno; } + lsm::internal::database_epoch get_database_epoch() const { + return database_epoch; + } + + iobuf buf; + lsm::sequence_number last_seqno; + lsm::internal::database_epoch database_epoch; + }; + std::optional persisted_manifest; +}; + +struct lsm_stm_snapshot + : public serde:: + envelope, serde::compat_version<0>> { + auto serde_fields() { return std::tie(state); } + lsm_state state; +}; + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.cc b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc new file mode 100644 index 0000000000000..81ad94fbde6f2 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc @@ -0,0 +1,256 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/level_one/metastore/lsm/stm.h" + +#include "cloud_topics/level_one/metastore/lsm/lsm_update.h" +#include "cloud_topics/level_one/metastore/lsm/state.h" +#include "cloud_topics/logger.h" +#include "model/record_batch_types.h" +#include "serde/async.h" +#include "ssx/future-util.h" +#include "utils/prefix_logger.h" + +#include + +#include + +namespace cloud_topics::l1 { + +namespace { +template +void maybe_log_update_error( + prefix_logger& log, + lsm_update_key key, + model::offset o, + const std::expected& r) { + if (r.has_value()) { + return; + } + // NOTE: inability to update the STM is not necessarily a bug! It indicates + // that this update's construction raced with another update that broke an + // invariant required to apply update. Expectation is that this update's + // caller constructs a new update and tries again if needed. + vlog( + log.debug, + "L1 LSM STM {} update at offset {} didn't apply: {}", + key, + o, + r.error()); +} +} // namespace + +stm::stm( + ss::logger& logger, + raft::consensus* raft, + config::binding snapshot_delay) + : metastore_stm_base("l1_lsm_stm.snapshot", logger, raft) + , snapshot_delay_secs_(std::move(snapshot_delay)) { + snapshot_timer_.set_callback([this] { write_snapshot_async(); }); +} + +ss::future> +stm::sync(model::timeout_clock::duration timeout) { + auto sync_res = co_await ss::coroutine::as_future( + metastore_stm_base::sync(timeout)); + if (sync_res.failed()) { + auto eptr = sync_res.get_exception(); + auto msg = fmt::format("Exception caught while syncing: {}", eptr); + if (ssx::is_shutdown_exception(eptr)) { + vlog(cd_log.debug, "Ignoring shutdown error: {}", msg); + co_return std::unexpected(errc::shutting_down); + } + vlog(cd_log.warn, "{}", msg); + co_return std::unexpected(errc::raft_error); + } + + if (!sync_res.get()) { + co_return std::unexpected(errc::not_leader); + } + // At this point we're guaranteed that this node is the leader and that the + // STM has been caught up in the term before this one (this doesn't mean + // the STM is currently caught up right now though, e.g. if there are + // in-flight updates from this term!) + co_return _insync_term; +} + +ss::future> stm::replicate_and_wait( + model::term_id term, model::record_batch batch, ss::abort_source& as) { + constexpr auto replicate_timeout = 10s; + auto opts = raft::replicate_options( + raft::consistency_level::quorum_ack, + term, + replicate_timeout, + std::ref(as)); + opts.set_force_flush(); + const auto offsets_delta = batch.header().last_offset_delta; + const auto res = co_await _raft->replicate(std::move(batch), opts); + if (res.has_error()) { + co_return std::unexpected(errc::raft_error); + } + auto last_offset = res.value().last_offset; + auto base_offset = model::offset(last_offset() - offsets_delta); + auto replicated_offset = res.value().last_offset; + auto success = co_await wait_no_throw( + replicated_offset, ss::lowres_clock::now() + 30s, as); + if (!success || _raft->term() != term) { + vlog( + cd_log.debug, + "Waiting for apply of batch [{}, {}] failed", + base_offset, + last_offset); + co_return std::unexpected(errc::raft_error); + } + co_return base_offset; +} + +ss::future<> stm::do_apply(const model::record_batch& batch) { + if (batch.header().type != model::record_batch_type::l1_stm) { + co_return; + } + + auto iter = model::record_batch_iterator::create(batch); + while (iter.has_next()) { + auto r = iter.next(); + auto key_buf = r.release_key(); + if (key_buf.size_bytes() == 0) { + continue; + } + + iobuf_parser key_parser(std::move(key_buf)); + auto key = serde::read(key_parser); + + iobuf value_buf = r.release_value(); + iobuf_parser value_parser(std::move(value_buf)); + + auto o = batch.base_offset() + model::offset_delta{r.offset_delta()}; + switch (key) { + case lsm_update_key::apply_write_batch: { + auto update = serde::read(value_parser); + auto result = update.apply(state_, o); + maybe_log_update_error(_log, key, o, result); + break; + } + case lsm_update_key::persist_manifest: { + auto update = co_await serde::read_async( + value_parser); + auto result = update.apply(state_); + maybe_log_update_error(_log, key, o, result); + break; + } + case lsm_update_key::set_domain_uuid: { + auto update = serde::read(value_parser); + auto result = update.apply(state_); + maybe_log_update_error(_log, key, o, result); + break; + } + case lsm_update_key::reset_manifest: { + auto update = co_await serde::read_async( + value_parser); + auto result = update.apply(state_, batch.term(), o); + maybe_log_update_error(_log, key, o, result); + break; + } + } + } + + rearm_snapshot_timer(); +} + +model::offset stm::max_removable_local_log_offset() { return {}; } + +ss::future +stm::apply_local_snapshot(raft::stm_snapshot_header, iobuf&& snapshot_buf) { + auto parser = iobuf_parser(std::move(snapshot_buf)); + auto snapshot = co_await serde::read_async(parser); + state_ = std::move(snapshot); + co_return raft::local_snapshot_applied::yes; +} + +ss::future +stm::take_local_snapshot(ssx::semaphore_units units) { + auto snapshot_offset = last_applied_offset(); + auto snapshot = co_await make_snapshot(); + units.return_all(); + iobuf snapshot_buf; + co_await serde::write_async(snapshot_buf, std::move(snapshot)); + co_return raft::stm_snapshot::create( + 0, snapshot_offset, std::move(snapshot_buf)); +} + +ss::future<> stm::apply_raft_snapshot(const iobuf& snapshot_buf) { + auto parser = iobuf_parser(snapshot_buf.copy()); + auto snapshot = co_await serde::read_async(parser); + state_ = std::move(snapshot); +} + +ss::future stm::take_raft_snapshot() { + iobuf snapshot_buf; + co_await serde::write_async(snapshot_buf, co_await make_snapshot()); + co_return std::move(snapshot_buf); +} + +ss::future stm::make_snapshot() const { + lsm_stm_snapshot snapshot; + snapshot.state = state_.copy(); + co_return snapshot; +} + +ss::future<> stm::stop() { + snapshot_timer_.cancel(); + co_await metastore_stm_base::stop(); +} + +ss::future<> stm::maybe_write_snapshot() { + if (_raft->last_snapshot_index() >= last_applied()) { + co_return; + } + auto snapshot = co_await _raft->stm_manager()->take_snapshot(); + vlog( + _log.debug, + "creating snapshot at offset: {}", + snapshot.last_included_offset); + co_await _raft->write_snapshot( + raft::write_snapshot_cfg( + snapshot.last_included_offset, std::move(snapshot.data))); +} + +void stm::write_snapshot_async() { + ssx::background = ssx::spawn_with_gate_then( + _gate, [this] { return maybe_write_snapshot(); }) + .handle_exception([this](const std::exception_ptr& e) { + vlog(_log.warn, "failed to write snapshot: {}", e); + }) + .finally([holder = _gate.hold()] {}); +} + +void stm::rearm_snapshot_timer() { + if (_gate.is_closed() || snapshot_timer_.armed()) { + return; + } + snapshot_timer_.arm(ss::lowres_clock::now() + snapshot_delay_secs_()); +} + +bool lsm_stm_factory::is_applicable_for( + const storage::ntp_config& ntp_cfg) const { + const auto& ntp = ntp_cfg.ntp(); + return (ntp.ns == model::kafka_internal_namespace) + && (ntp.tp.topic == model::l1_metastore_topic); +} + +void lsm_stm_factory::create( + raft::state_machine_manager_builder& builder, + raft::consensus* raft, + const cluster::stm_instance_config&) { + auto s = builder.create_stm(cd_log, raft, config::mock_binding(10s)); + raft->log()->stm_manager()->add_stm(std::move(s)); +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.h b/src/v/cloud_topics/level_one/metastore/lsm/stm.h new file mode 100644 index 0000000000000..963f36ac15b9b --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.h @@ -0,0 +1,100 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "cloud_topics/level_one/metastore/lsm/state.h" +#include "cluster/state_machine_registry.h" +#include "model/fundamental.h" +#include "raft/persisted_stm.h" + +namespace cloud_topics::l1 { + +using metastore_stm_base = raft::persisted_stm_no_snapshot_at_offset<>; + +// A replicated state machine that backs an LSM database on object storage. +// This state machine applies optimistic concurrency control to ensure the +// updates go to proper database. Any additional concurrency control (e.g. +// locking to ensure application integrity) is left to callers. +class stm final : public metastore_stm_base { +public: + static constexpr std::string_view name = "l1_lsm_stm"; + enum class errc { + not_leader, + raft_error, + shutting_down, + }; + + explicit stm( + ss::logger&, raft::consensus*, config::binding); + raft::consensus* raft() { return _raft; } + + // Syncs the STM such that we're guaranteed that it has applied all records + // from the previous terms. Calling does _not_ ensure that all records from + // the current term have been applied. But it does establish for new + // leaders that they are up-to-date. + // + // Returns the current term. + ss::future> + sync(model::timeout_clock::duration timeout); + + // Replicates the given batch and waits for it to finish replicating. + // Success here does not guarantee that the replicated operation succeeded + // in updating the STM -- only that the apply was attempted. + ss::future> replicate_and_wait( + model::term_id, model::record_batch batch, ss::abort_source&); + + const lsm_state& state() const { return state_; } + + raft::stm_initial_recovery_policy + get_initial_recovery_policy() const final { + return raft::stm_initial_recovery_policy::read_everything; + } + +protected: + ss::future<> stop() override; + + ss::future make_snapshot() const; + + ss::future<> do_apply(const model::record_batch&) override; + + model::offset max_removable_local_log_offset() override; + + ss::future + apply_local_snapshot(raft::stm_snapshot_header, iobuf&& bytes) override; + + ss::future + take_local_snapshot(ssx::semaphore_units) override; + + ss::future<> apply_raft_snapshot(const iobuf&) final; + + ss::future take_raft_snapshot() final; + +private: + void rearm_snapshot_timer(); + void write_snapshot_async(); + ss::future<> maybe_write_snapshot(); + + // The deterministic state managed by this STM. + lsm_state state_; + config::binding snapshot_delay_secs_; + ss::timer snapshot_timer_; +}; + +class lsm_stm_factory : public cluster::state_machine_factory { +public: + lsm_stm_factory() = default; + bool is_applicable_for(const storage::ntp_config&) const final; + void create( + raft::state_machine_manager_builder&, + raft::consensus*, + const cluster::stm_instance_config&) final; +}; + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD new file mode 100644 index 0000000000000..13fcdc4de9e45 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD @@ -0,0 +1,18 @@ +load("//bazel:test.bzl", "redpanda_cc_gtest") + +redpanda_cc_gtest( + name = "lsm_update_test", + timeout = "short", + srcs = [ + "lsm_update_test.cc", + ], + deps = [ + "//src/v/cloud_topics/level_one/metastore/lsm:lsm_update", + "//src/v/cloud_topics/level_one/metastore/lsm:state", + "//src/v/cloud_topics/level_one/metastore/lsm:write_batch_row", + "//src/v/container:chunked_vector", + "//src/v/model", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + ], +) diff --git a/src/v/cloud_topics/level_one/metastore/lsm/tests/lsm_update_test.cc b/src/v/cloud_topics/level_one/metastore/lsm/tests/lsm_update_test.cc new file mode 100644 index 0000000000000..ed2a1a6133401 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/tests/lsm_update_test.cc @@ -0,0 +1,351 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/level_one/metastore/lsm/lsm_update.h" +#include "cloud_topics/level_one/metastore/lsm/state.h" +#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" + +#include + +using namespace cloud_topics::l1; + +namespace { + +// Helper to create a test domain UUID. +domain_uuid new_domain_uuid() { return domain_uuid(uuid_t::create()); } + +// Helper to create a serialized manifest with given epoch and seqno. +lsm_state::serialized_manifest +create_test_manifest(uint64_t epoch, lsm::sequence_number seqno) { + return lsm_state::serialized_manifest{ + .buf = iobuf::from("testbuf"), + .last_seqno = seqno, + .database_epoch = lsm::internal::database_epoch(epoch), + }; +} + +// Helper to create test rows. +chunked_vector create_rows(size_t count) { + chunked_vector rows; + for (size_t i = 0; i < count; ++i) { + rows.emplace_back( + write_batch_row{ + .key = fmt::format("key{}", i), + .value = iobuf::from(fmt::format("value{}", i)), + }); + } + return rows; +} + +} // namespace + +TEST(ApplyWriteBatchUpdateTest, TestApplyWriteBatchHappyPath) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Create some rows and apply them. + auto rows = create_rows(3); + auto update_res = apply_write_batch_update::build( + state, uuid, std::move(rows)); + ASSERT_TRUE(update_res.has_value()); + + auto& update = update_res.value(); + EXPECT_EQ(update.expected_uuid, uuid); + EXPECT_EQ(update.rows.size(), 3); + + auto apply_res = update.apply(state, model::offset(10)); + ASSERT_TRUE(apply_res.has_value()); + + // Verify rows were added to volatile buffer with correct sequence numbers. + EXPECT_EQ(state.volatile_buffer.size(), 3); + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(state.volatile_buffer[i].seqno, lsm::sequence_number(10)); + EXPECT_EQ(state.volatile_buffer[i].row.key, fmt::format("key{}", i)); + } +} + +TEST(ApplyWriteBatchUpdateTest, TestApplyWriteBatchDomainUuidMismatch) { + lsm_state state; + auto uuid1 = new_domain_uuid(); + auto uuid2 = new_domain_uuid(); + state.domain_uuid = uuid1; + + auto rows = create_rows(1); + // Try to build with wrong UUID. + auto update_res = apply_write_batch_update::build( + state, uuid2, std::move(rows)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("Expected domain UUID"), ss::sstring::npos); +} + +TEST(ApplyWriteBatchUpdateTest, TestApplyWriteBatchEmptyRows) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + chunked_vector empty_rows; + auto update_res = apply_write_batch_update::build( + state, uuid, std::move(empty_rows)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("Write batch is empty"), ss::sstring::npos); +} + +TEST(PersistManifestUpdateTest, TestPersistManifestHappyPath) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Add some volatile rows. + state.volatile_buffer.push_back( + volatile_row{ + .seqno = lsm::sequence_number(5), + .row = write_batch_row{.key = "key1", .value = iobuf::from("value1")}, + }); + state.volatile_buffer.push_back( + volatile_row{ + .seqno = lsm::sequence_number(10), + .row = write_batch_row{.key = "key2", .value = iobuf::from("value2")}, + }); + state.volatile_buffer.push_back( + volatile_row{ + .seqno = lsm::sequence_number(15), + .row = write_batch_row{.key = "key3", .value = iobuf::from("value3")}, + }); + + // Persist manifest up to seqno 10. + auto manifest = create_test_manifest(1, lsm::sequence_number(10)); + auto update_res = persist_manifest_update::build( + state, uuid, std::move(manifest)); + ASSERT_TRUE(update_res.has_value()); + + auto apply_res = update_res.value().apply(state); + ASSERT_TRUE(apply_res.has_value()); + + // Volatile buffer should be trimmed up to seqno 10. + EXPECT_EQ(state.volatile_buffer.size(), 1); + EXPECT_EQ(state.volatile_buffer.front().seqno, lsm::sequence_number(15)); + ASSERT_TRUE(state.persisted_manifest.has_value()); + EXPECT_EQ(state.persisted_manifest->get_last_seqno()(), 10); +} + +TEST(PersistManifestUpdateTest, TestPersistManifestDomainUuidMismatch) { + lsm_state state; + auto uuid1 = new_domain_uuid(); + auto uuid2 = new_domain_uuid(); + state.domain_uuid = uuid1; + + auto manifest = create_test_manifest(1, lsm::sequence_number(100)); + auto update_res = persist_manifest_update::build( + state, uuid2, std::move(manifest)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("Expected domain UUID"), ss::sstring::npos); +} + +TEST(PersistManifestUpdateTest, TestPersistManifestSeqnoNotMonotonic) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Set existing manifest with seqno 100. + state.persisted_manifest = create_test_manifest( + 1, lsm::sequence_number(100)); + + // Try to persist manifest with lower seqno. + { + auto manifest = create_test_manifest(1, lsm::sequence_number(99)); + auto update_res = persist_manifest_update::build( + state, uuid, std::move(manifest)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("below current seqno"), ss::sstring::npos); + } + // At the same seqno is allowed though (e.g. for compaction). + { + auto manifest = create_test_manifest(1, lsm::sequence_number(100)); + auto update_res = persist_manifest_update::build( + state, uuid, std::move(manifest)); + EXPECT_TRUE(update_res.has_value()); + } +} + +TEST(PersistManifestUpdateTest, TestPersistManifestEpochNotMonotonic) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Set existing manifest with a specified epoch. + state.persisted_manifest = create_test_manifest( + 4, lsm::sequence_number(100)); + + // Try to persist manifest with lower epoch. + { + auto manifest = create_test_manifest(3, lsm::sequence_number(100)); + auto update_res = persist_manifest_update::build( + state, uuid, std::move(manifest)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("below current epoch"), ss::sstring::npos); + } + // At the same epoch is allowed though. + { + auto manifest = create_test_manifest(4, lsm::sequence_number(100)); + auto update_res = persist_manifest_update::build( + state, uuid, std::move(manifest)); + EXPECT_TRUE(update_res.has_value()); + } +} + +TEST(SetDomainUuidUpdateTest, TestSetDomainUuidHappyPath) { + lsm_state state; + auto uuid = new_domain_uuid(); + auto update_res = set_domain_uuid_update::build(state, uuid); + ASSERT_TRUE(update_res.has_value()); + + auto apply_res = update_res.value().apply(state); + ASSERT_TRUE(apply_res.has_value()); + + EXPECT_EQ(state.domain_uuid, uuid); +} + +TEST(SetDomainUuidUpdateTest, TestSetDomainUuidAlreadySet) { + lsm_state state; + auto uuid1 = new_domain_uuid(); + auto uuid2 = new_domain_uuid(); + state.domain_uuid = uuid1; + + auto update_res = set_domain_uuid_update::build(state, uuid2); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE(update_res.error()().find("already set"), ss::sstring::npos); +} + +TEST(ResetManifestUpdateTest, TestResetManifestHappyPath) { + lsm_state state; + auto old_uuid = new_domain_uuid(); + state.domain_uuid = old_uuid; + + auto new_uuid = new_domain_uuid(); + auto manifest = create_test_manifest(5, lsm::sequence_number(1000)); + auto update_res = reset_manifest_update::build( + state, new_uuid, std::move(manifest)); + ASSERT_TRUE(update_res.has_value()); + + // Apply the reset at a different term and offset. + model::term_id log_term(3); + model::offset log_offset(50); + auto apply_res = update_res.value().apply(state, log_term, log_offset); + ASSERT_TRUE(apply_res.has_value()); + + EXPECT_EQ(state.domain_uuid, new_uuid); + ASSERT_TRUE(state.persisted_manifest.has_value()); + EXPECT_EQ(state.persisted_manifest->get_last_seqno()(), 1000); + EXPECT_EQ(state.persisted_manifest->get_database_epoch()(), 5); + + // Deltas should be computed correctly + EXPECT_EQ(state.seqno_delta, 1000 - 50); + EXPECT_EQ(state.db_epoch_delta, 5 - 3); + + // Add some rows. + auto rows = create_rows(1); + auto write_update_res = apply_write_batch_update::build( + state, new_uuid, std::move(rows)); + ASSERT_TRUE(write_update_res.has_value()); + auto write_apply_res = write_update_res.value().apply( + state, model::next_offset(log_offset)); + ASSERT_TRUE(write_apply_res.has_value()); + + // Since we applied the write at the next offset, the applied sequence + // number should be one above the restored. + ASSERT_EQ(1, state.volatile_buffer.size()); + EXPECT_EQ(state.volatile_buffer[0].seqno, lsm::sequence_number(1001)); +} + +TEST(ResetManifestUpdateTest, TestResetManifestEmptyManifest) { + lsm_state state; + auto old_uuid = new_domain_uuid(); + state.domain_uuid = old_uuid; + + auto new_uuid = new_domain_uuid(); + auto update_res = reset_manifest_update::build( + state, new_uuid, std::nullopt); + ASSERT_TRUE(update_res.has_value()); + + model::term_id log_term(2); + model::offset log_offset(10); + auto apply_res = update_res.value().apply(state, log_term, log_offset); + ASSERT_TRUE(apply_res.has_value()); + + // UUID should be updated + EXPECT_EQ(state.domain_uuid, new_uuid); + + // No manifest should be set + EXPECT_FALSE(state.persisted_manifest.has_value()); + + // Deltas should be computed with seqno and epoch = 0 + EXPECT_EQ(state.seqno_delta, -10); + EXPECT_EQ(state.db_epoch_delta, -2); + + // Add some rows. + auto rows = create_rows(1); + auto write_update_res = apply_write_batch_update::build( + state, new_uuid, std::move(rows)); + ASSERT_TRUE(write_update_res.has_value()); + auto write_apply_res = write_update_res.value().apply( + state, model::next_offset(log_offset)); + ASSERT_TRUE(write_apply_res.has_value()); + + // Since we applied the write at the next offset, the applied sequence + // number should be one above the restored. + ASSERT_EQ(1, state.volatile_buffer.size()); + EXPECT_EQ(state.volatile_buffer[0].seqno, lsm::sequence_number(1)); +} + +TEST(ResetManifestUpdateTest, TestResetManifestVolatileBufferNotEmpty) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Add volatile rows + state.volatile_buffer.push_back( + volatile_row{ + .seqno = lsm::sequence_number(1), + .row = write_batch_row{.key = "key1", .value = iobuf::from("value1")}, + }); + + auto new_uuid = new_domain_uuid(); + auto manifest = create_test_manifest(1, lsm::sequence_number(100)); + auto update_res = reset_manifest_update::build( + state, new_uuid, std::move(manifest)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE(update_res.error()().find("already has"), ss::sstring::npos); + EXPECT_NE(update_res.error()().find("rows"), ss::sstring::npos); +} + +TEST(ResetManifestUpdateTest, TestResetManifestPersistedDataExists) { + lsm_state state; + auto uuid = new_domain_uuid(); + state.domain_uuid = uuid; + + // Set existing persisted manifest with seqno > 0 + state.persisted_manifest = create_test_manifest( + 2, lsm::sequence_number(500)); + + auto new_uuid = new_domain_uuid(); + auto manifest = create_test_manifest(1, lsm::sequence_number(100)); + auto update_res = reset_manifest_update::build( + state, new_uuid, std::move(manifest)); + EXPECT_FALSE(update_res.has_value()); + EXPECT_NE( + update_res.error()().find("already has persisted manifest"), + ss::sstring::npos); +} diff --git a/src/v/cloud_topics/level_one/metastore/lsm/write_batch_row.h b/src/v/cloud_topics/level_one/metastore/lsm/write_batch_row.h new file mode 100644 index 0000000000000..47efd9b46e4af --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/write_batch_row.h @@ -0,0 +1,38 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "base/format_to.h" +#include "bytes/iobuf.h" +#include "serde/envelope.h" + +#include + +namespace cloud_topics::l1 { + +struct write_batch_row + : public serde:: + envelope, serde::compat_version<0>> { + friend bool operator==(const write_batch_row&, const write_batch_row&) + = default; + auto serde_fields() { return std::tie(key, value); } + + // NOTE: user-space key. + ss::sstring key; + + iobuf value; + + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to( + it, "{{key: {}, value: {}}}", key, value.linearize_to_string()); + } +}; + +} // namespace cloud_topics::l1