Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/v/cloud_topics/level_one/metastore/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ redpanda_cc_rpc_library(
src = "rpc.json",
)

redpanda_cc_library(
name = "domain_uuid",
hdrs = [
"domain_uuid.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/utils:named_type",
"//src/v/utils:uuid",
],
)

redpanda_cc_library(
name = "garbage_collector",
srcs = [
Expand Down
19 changes: 19 additions & 0 deletions src/v/cloud_topics/level_one/metastore/domain_uuid.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "utils/named_type.h"
#include "utils/uuid.h"

namespace cloud_topics::l1 {

using domain_uuid = named_type<uuid_t, struct domain_uuid_tag>;

} // namespace cloud_topics::l1
45 changes: 45 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "write_batch_row",
hdrs = [
"write_batch_row.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/bytes:iobuf",
"//src/v/serde",
"@seastar",
],
)

redpanda_cc_library(
name = "state",
srcs = [
"state.cc",
],
hdrs = [
"state.h",
],
implementation_deps = [
"//src/v/serde:iobuf",
"//src/v/serde:named_type",
"//src/v/serde:optional",
"//src/v/serde:sstring",
"//src/v/serde:uuid",
"//src/v/serde:vector",
],
visibility = ["//visibility:public"],
deps = [
":write_batch_row",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
"//src/v/cloud_topics/level_one/metastore:domain_uuid",
"//src/v/container:chunked_vector",
"//src/v/lsm",
"//src/v/model",
"//src/v/serde",
"@seastar",
],
)
60 changes: 60 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "cloud_topics/level_one/metastore/lsm/state.h"

#include "serde/rw/envelope.h"
#include "serde/rw/iobuf.h"
#include "serde/rw/named_type.h"
#include "serde/rw/optional.h"
#include "serde/rw/sstring.h"
#include "serde/rw/uuid.h"
#include "serde/rw/vector.h"

#include <seastar/core/coroutine.hh>

namespace cloud_topics::l1 {

namespace {
std::deque<volatile_row> copy_rows(const std::deque<volatile_row>& rows) {
std::deque<volatile_row> copy;
for (const auto& r : rows) {
copy.push_back(
volatile_row{
.seqno = r.seqno,
.row = write_batch_row{
.key = r.row.key, .value = r.row.value.copy()}});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

share?

}
return copy;
}
} // namespace

lsm_state::serialized_manifest lsm_state::serialized_manifest::copy() const {
return serialized_manifest{
.buf = buf.copy(),
.last_seqno = last_seqno,
.database_epoch = database_epoch,
};
}

lsm_state lsm_state::copy() const {
std::optional<serialized_manifest> manifest_copy;
if (persisted_manifest.has_value()) {
manifest_copy = persisted_manifest->copy();
}
return lsm_state{
.domain_uuid = domain_uuid,
.seqno_delta = seqno_delta,
.db_epoch_delta = db_epoch_delta,
.volatile_buffer = copy_rows(volatile_buffer),
.persisted_manifest = std::move(manifest_copy),
};
}

} // namespace cloud_topics::l1
120 changes: 120 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "bytes/iobuf.h"
#include "cloud_topics/level_one/metastore/domain_uuid.h"
#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h"
#include "lsm/lsm.h"
#include "serde/envelope.h"

#include <deque>

namespace cloud_topics::l1 {

struct volatile_row
: public serde::
envelope<volatile_row, serde::version<0>, serde::compat_version<0>> {
friend bool operator==(const volatile_row&, const volatile_row&) = default;
auto serde_fields() { return std::tie(seqno, row); }
lsm::sequence_number seqno;
write_batch_row row;

fmt::iterator format_to(fmt::iterator it) const {
return fmt::format_to(it, "{{seqno: {}, row: {}}}", seqno, row);
}
};

// State that backs a replicated database on object storage. This comprises of:
// - A UUID that uniquely identifies the database state, to ensure we only
// accept operations meant for the correct database. This gets updated when
// the state is ever restored from a manifest.
// - Deltas for the offset and term to be applied to seqno and epochs to ensure
// monotonicity for the underlying database. These may be non-zero after
// restoring a manifest.
// - A manifest that has been uploaded to object storage from which all
// replicas will be able to open a database.
// - A list of writes that need to be replayed on top of the manifest in order
// to be caught up. This list can be thought of as a write-ahead log for the
// database.
struct lsm_state
: public serde::
envelope<lsm_state, serde::version<0>, serde::compat_version<0>> {
friend bool operator==(const lsm_state&, const lsm_state&) = default;
auto serde_fields() {
return std::tie(
domain_uuid,
seqno_delta,
db_epoch_delta,
volatile_buffer,
persisted_manifest);
}
lsm_state copy() const;

// The unique identifier for this LSM state. This should be used as the
// basis for where the database writes its data and metadata.
//
// Must be set before any writes are accepted. May be reset if recovering
// from an existing manifest that is written with a different domain path
// (e.g. when recovering state from cloud).
domain_uuid domain_uuid{};

// Difference between Raft offset/term and database seqno/epoch. These will
// be non-zero upon recovery from object storage, since the restored
// seqno/epoch will not start at zero in that case. These may also be
// negative, in case recovery was performed on a non-empty Raft log.
//
// seqno = offset + seqno_delta
// epoch = term + epoch_delta
//
// TODO: use named types to enforce correct arithmetic rules.
int64_t seqno_delta{0};
int64_t db_epoch_delta{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it simpler to seed the start offset and term with correct values during the recovery?
With delta's most of the code will always run and will mostly be tested with both deltas set to zero. If one of these values is not taken into account it'll be easy for the bug to be unnoticed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's simpler because that means we need to know those values at the time of partition creation. Fair concern about testing mostly with zero -- I'll give it some thought if there are ways to make this more bulletproof, though my hope is that this STM is kept fairly simple and doesn't gain too much more complexity around seqnos and epochs..


// Rows that aren't persisted to object storage yet but should be applied
// to the database. When opening a database from the persisted manifest,
// these rows must be applied to the opened database to catch it up.
std::deque<volatile_row> volatile_buffer;
Comment on lines +81 to +84
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: having the volatile buffer be in memory isn't a hard requirement. Another implementation would be to have leaders read the Raft log to replay the write batches, and have the readers filter out at replay time the rows that don't belong to the current domain. I implemented it with it explicitly being in memory because I found it easier to reason about correctness, but it shouldn't be too difficult to remove (just a matter of correctly accounting for the max removable local log offset)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this double the write buffer memory usage or will we share out of these bytes?

Should this be a chunked fifo instead of a deque


// State that is persisted to object storage. This state contains write
// operations up to a given sequence number; operations from below that
// sequence number can be removed from the volatile buffer and no longer
// need to be replayed to the database when opening the database from this
// state.
//
// TODO: we store the serialized manifest because it allows us to copy
// synchronously without adding locks. If we want to maintain the manifest
// as raw proto, we will need to add a synchronous copy to protobuf, or add
// a lock around the manifest to safely async copy it.
struct serialized_manifest
: public serde::envelope<
serialized_manifest,
serde::version<0>,
serde::compat_version<0>> {
auto serde_fields() {
return std::tie(buf, last_seqno, database_epoch);
}
friend bool
operator==(const serialized_manifest&, const serialized_manifest&)
= default;
serialized_manifest copy() const;
lsm::sequence_number get_last_seqno() const { return last_seqno; }
lsm::internal::database_epoch get_database_epoch() const {
return database_epoch;
}

iobuf buf;
lsm::sequence_number last_seqno;
lsm::internal::database_epoch database_epoch;
};
std::optional<serialized_manifest> persisted_manifest;
};

} // namespace cloud_topics::l1
38 changes: 38 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/write_batch_row.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "base/format_to.h"
#include "bytes/iobuf.h"
#include "serde/envelope.h"

#include <fmt/core.h>

namespace cloud_topics::l1 {

struct write_batch_row
: public serde::
envelope<write_batch_row, serde::version<0>, serde::compat_version<0>> {
friend bool operator==(const write_batch_row&, const write_batch_row&)
= default;
auto serde_fields() { return std::tie(key, value); }

// NOTE: user-space key.
ss::sstring key;

iobuf value;

fmt::iterator format_to(fmt::iterator it) const {
return fmt::format_to(
it, "{{key: {}, value: {}}}", key, value.linearize_to_string());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe limit the value length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linearize_to_string() will throw if the value is over 128KiB. Also worth noting (and maybe I'll add a comment) this is only used in tests

}
};

} // namespace cloud_topics::l1