Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/v/cloud_topics/level_one/metastore/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ redpanda_cc_rpc_library(
src = "rpc.json",
)

redpanda_cc_library(
name = "domain_uuid",
hdrs = [
"domain_uuid.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/utils:named_type",
"//src/v/utils:uuid",
],
)

redpanda_cc_library(
name = "garbage_collector",
srcs = [
Expand Down
19 changes: 19 additions & 0 deletions src/v/cloud_topics/level_one/metastore/domain_uuid.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "utils/named_type.h"
#include "utils/uuid.h"

namespace cloud_topics::l1 {

using domain_uuid = named_type<uuid_t, struct domain_uuid_tag>;

} // namespace cloud_topics::l1
73 changes: 73 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "write_batch_row",
hdrs = [
"write_batch_row.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/bytes:iobuf",
"//src/v/serde",
"@seastar",
],
)

redpanda_cc_library(
name = "state",
srcs = [
"state.cc",
],
hdrs = [
"state.h",
],
implementation_deps = [
"//src/v/serde:iobuf",
"//src/v/serde:named_type",
"//src/v/serde:optional",
"//src/v/serde:sstring",
"//src/v/serde:uuid",
"//src/v/serde:vector",
],
visibility = ["//visibility:public"],
deps = [
":write_batch_row",
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
"//src/v/cloud_topics/level_one/metastore:domain_uuid",
"//src/v/container:chunked_vector",
"//src/v/lsm",
"//src/v/model",
"//src/v/serde",
"@seastar",
],
)

redpanda_cc_library(
name = "lsm_update",
srcs = [
"lsm_update.cc",
],
hdrs = [
"lsm_update.h",
],
implementation_deps = [
"//src/v/cloud_topics/level_one/metastore:domain_uuid",
"//src/v/serde:iobuf",
"//src/v/serde:named_type",
"//src/v/serde:optional",
"//src/v/serde:uuid",
],
visibility = ["//visibility:public"],
deps = [
":state",
":write_batch_row",
"//src/v/base",
"//src/v/container:chunked_vector",
"//src/v/model",
"//src/v/serde",
"//src/v/utils:named_type",
"@seastar",
],
)
253 changes: 253 additions & 0 deletions src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Copyright 2025 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_topics/level_one/metastore/lsm/lsm_update.h"

#include "cloud_topics/level_one/metastore/domain_uuid.h"
#include "model/fundamental.h"
#include "serde/rw/iobuf.h"
#include "serde/rw/named_type.h"
#include "serde/rw/optional.h"
#include "serde/rw/uuid.h"

#include <seastar/core/coroutine.hh>

namespace cloud_topics::l1 {

std::expected<apply_write_batch_update, lsm_update_error>
apply_write_batch_update::build(
const lsm_state& state,
domain_uuid expected_uuid,
chunked_vector<write_batch_row> rows) {
apply_write_batch_update update{
.expected_uuid = expected_uuid,
.rows = std::move(rows),
};
auto allowed = update.can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
return update;
}

std::expected<std::monostate, lsm_update_error>
apply_write_batch_update::can_apply(const lsm_state& state) const {
if (state.domain_uuid != expected_uuid) {
return std::unexpected(
lsm_update_error{fmt::format(
"Expected domain UUID: {}, actual: {}",
expected_uuid,
state.domain_uuid)});
}
if (rows.empty()) {
return std::unexpected(lsm_update_error{"Write batch is empty"});
}
return std::monostate{};
}

std::expected<std::monostate, lsm_update_error>
apply_write_batch_update::apply(lsm_state& state, model::offset base_offset) {
auto allowed = can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
lsm::sequence_number seqno(base_offset() + state.seqno_delta);
for (auto& row : rows) {
state.volatile_buffer.push_back(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in the ctp stm there is a lot of concerns with idempotency. I think this is only a concern with local snapshots or something? Anyways, is that either not applicable or how do we not hit that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idempotency and general concurrency is left up to callers.

My thinking is that it will look like:

  • open a given database in term T
  • take a lock to ensure only one fiber is replicating at a time (for the sake of example, this could be a lock on the database, but could be finer grained, like a row lock)
  • check invariants of the database while the lock is held
  • replicate the writes in term T
  • if there is a timeout, retry replication without dropping the lock to ensure the writes make it through (or step down as leader)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in terms of the snapshots and log replays. I think the answer is that we don't support snapshots at offsets, so we don't have to worry about it? The ctp stm does but because the updates are applied idempotently it's okay to replay old state.

I think in this case snapshots are going to require us to re-open the database to discard writes that might have happened while it was an old leader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case snapshots are going to require us to re-open the database to discard writes that might have happened while it was an old leader

Ah, I think I understand what you're asking. You're right, this STM doesn't support snapshots at offsets. And yes, when we become leader we'll need to sync the STM and reopen the database to ensure we don't miss out on writes from previous terms.

volatile_row{.seqno = seqno, .row = std::move(row)});
Comment on lines +63 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we apply manifest update we pop off seq no it looks like there is an implied monotonic order. but do we enforce that here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted on slack, it's somewhat implied by virtue of us calling apply() with the Raft log offset of the update

}

return std::monostate{};
}

apply_write_batch_update apply_write_batch_update::copy() const {
apply_write_batch_update ret{
.expected_uuid = expected_uuid,
};
for (const auto& r : rows) {
ret.rows.push_back(
write_batch_row{
.key = r.key,
.value = r.value.copy(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

share?

});
}
return ret;
}

std::expected<persist_manifest_update, lsm_update_error>
persist_manifest_update::build(
const lsm_state& state,
domain_uuid expected_uuid,
lsm_state::serialized_manifest manifest) {
persist_manifest_update update{
.expected_uuid = expected_uuid,
.manifest = std::move(manifest),
};
auto allowed = update.can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
return update;
}

std::expected<std::monostate, lsm_update_error>
persist_manifest_update::can_apply(const lsm_state& state) const {
if (state.domain_uuid != expected_uuid) {
return std::unexpected(
lsm_update_error{fmt::format(
"Expected domain UUID: {}, actual: {}",
expected_uuid,
state.domain_uuid)});
}
if (!state.persisted_manifest.has_value()) {
return std::monostate{};
}
// Epoch and seqno should be monotonically increasing, so only accept them
// if they go up.
if (
manifest.get_last_seqno() < state.persisted_manifest->get_last_seqno()) {
return std::unexpected(
lsm_update_error{fmt::format(
"Cannot persist manifest up to seqno {} which is below current "
"seqno {}",
manifest.get_last_seqno(),
state.persisted_manifest->get_last_seqno())});
}
if (
manifest.get_database_epoch()
< state.persisted_manifest->get_database_epoch()) {
return std::unexpected(
lsm_update_error{fmt::format(
"Cannot persist manifest from epoch {} which is below current "
"epoch {}",
manifest.get_database_epoch(),
state.persisted_manifest->get_database_epoch())});
}

return std::monostate{};
}

std::expected<std::monostate, lsm_update_error>
persist_manifest_update::apply(lsm_state& state) {
auto allowed = can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}

while (!state.volatile_buffer.empty()
&& state.volatile_buffer.front().seqno
<= manifest.get_last_seqno()) {
state.volatile_buffer.pop_front();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically a flush operation, right? Everything below or equal to manifest.get_last_seqno() should be applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a valid scenario in which the volatile_buffer is not empty after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically a flush operation, right

That's right

Is there a valid scenario in which the volatile_buffer is not empty after this?

Yeah, I think something like this:

  1. write row1 at seqno1
  2. call flush on db
  3. manifest is uploaded with row1, its applied seqno is seqno1
  4. write row2 at seqno2
  5. manifest is replicated and applied to the STM
  6. volatile buffer removes row1 but keeps row2

}
state.persisted_manifest = std::move(manifest);
return std::monostate{};
}

std::expected<set_domain_uuid_update, lsm_update_error>
set_domain_uuid_update::build(const lsm_state& state, domain_uuid uuid) {
set_domain_uuid_update update{
.uuid = uuid,
};
auto allowed = update.can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
return update;
}

std::expected<std::monostate, lsm_update_error>
set_domain_uuid_update::can_apply(const lsm_state& state) const {
if (state.domain_uuid != domain_uuid{}) {
return std::unexpected(
lsm_update_error{fmt::format(
"Cannot set domain UUID: already set to {}", state.domain_uuid)});
}
return std::monostate{};
}

std::expected<std::monostate, lsm_update_error>
set_domain_uuid_update::apply(lsm_state& state) {
auto allowed = can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}

state.domain_uuid = uuid;

return std::monostate{};
}

std::expected<reset_manifest_update, lsm_update_error>
reset_manifest_update::build(
const lsm_state& state,
domain_uuid uuid,
std::optional<lsm_state::serialized_manifest> new_manifest) {
reset_manifest_update update{
.new_uuid = uuid,
.new_manifest = std::move(new_manifest),
};
auto allowed = update.can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
return update;
}

std::expected<std::monostate, lsm_update_error>
reset_manifest_update::can_apply(const lsm_state& state) const {
if (!state.volatile_buffer.empty()) {
return std::unexpected(
lsm_update_error{fmt::format(
"Cannot reset manifest: STM already has {} rows",
state.volatile_buffer.size())});
}
if (
state.persisted_manifest.has_value()
&& state.persisted_manifest->last_seqno > lsm::sequence_number(0)) {
const auto& cur_manifest = *state.persisted_manifest;
return std::unexpected(
lsm_update_error{fmt::format(
"Cannot reset manifest: STM already has persisted manifest, epoch: "
"{}, seqno: {}",
cur_manifest.get_database_epoch(),
cur_manifest.get_last_seqno())});
}
return std::monostate{};
}

std::expected<std::monostate, lsm_update_error> reset_manifest_update::apply(
lsm_state& state, model::term_id t, model::offset o) {
auto allowed = can_apply(state);
if (!allowed.has_value()) {
return std::unexpected(allowed.error());
}
dassert(
state.volatile_buffer.empty(),
"Expected volatile buffer to be empty: {}",
state.volatile_buffer.size());

state.domain_uuid = new_uuid;
int64_t persisted_seqno = new_manifest ? new_manifest->get_last_seqno()
: lsm::sequence_number(0);
int64_t persisted_epoch = new_manifest ? new_manifest->get_database_epoch()
: 0;
state.persisted_manifest = std::move(new_manifest);

// Reset the deltas. These are set based on the persisted values,
// with the assumption that the STM is being initialized near raft offset 0.
// The deltas ensure that future raft offsets map correctly to database
// seqnos/epochs.
state.seqno_delta = persisted_seqno - o();
state.db_epoch_delta = persisted_epoch - t();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it makes sense to add an assertion or logging for the case if current offset or current term is greater than persisted one. Otherwise it'd be an UB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's valid for the deltas to be negative, or do you mean something else?


return std::monostate{};
}

} // namespace cloud_topics::l1
Loading