Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ apply_write_batch_update::apply(lsm_state& state, model::offset base_offset) {
return std::monostate{};
}

apply_write_batch_update apply_write_batch_update::copy() const {
apply_write_batch_update apply_write_batch_update::share() {
apply_write_batch_update ret{
.expected_uuid = expected_uuid,
};
for (const auto& r : rows) {
for (auto& r : rows) {
ret.rows.push_back(
write_batch_row{
.key = r.key,
.value = r.value.copy(),
.value = r.value.share(),
});
}
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct apply_write_batch_update
std::expected<std::monostate, lsm_update_error>
apply(lsm_state&, model::offset);

apply_write_batch_update copy() const;
apply_write_batch_update share();

domain_uuid expected_uuid;
chunked_vector<write_batch_row> rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ replicated_database::write(chunked_vector<write_batch_row> rows) {
builder.set_batch_type(model::record_batch_type::l1_stm);
builder.add_record(
{.key = serde::to_iobuf(lsm_update_key::apply_write_batch),
.value = serde::to_iobuf(update.value().copy())});
.value = serde::to_iobuf(update.value().share())});
auto batch = co_await std::move(builder).build();

auto replicate_result = co_await stm_->replicate_and_wait(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class replicated_metadata_persistence : public lsm::io::metadata_persistence {
// There is no persisted manifest.
co_return std::nullopt;
}
co_return _stm->state().persisted_manifest->buf.copy();
co_return _stm->mutable_state().persisted_manifest->buf.share();
}

ss::future<>
Expand Down
16 changes: 8 additions & 8 deletions src/v/cloud_topics/level_one/metastore/lsm/state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@
namespace cloud_topics::l1 {

namespace {
std::deque<volatile_row> copy_rows(const std::deque<volatile_row>& rows) {
std::deque<volatile_row> share_rows(std::deque<volatile_row>& rows) {
std::deque<volatile_row> copy;
for (const auto& r : rows) {
for (auto& r : rows) {
copy.push_back(
volatile_row{
.seqno = r.seqno,
.row = write_batch_row{
.key = r.row.key, .value = r.row.value.copy()}});
.key = r.row.key, .value = r.row.value.share()}});
}
return copy;
}
} // namespace

lsm_state::serialized_manifest lsm_state::serialized_manifest::copy() const {
lsm_state::serialized_manifest lsm_state::serialized_manifest::share() {
return serialized_manifest{
.buf = buf.copy(),
.buf = buf.share(),
.last_seqno = last_seqno,
.database_epoch = database_epoch,
};
}

lsm_state lsm_state::copy() const {
lsm_state lsm_state::share() {
std::optional<serialized_manifest> manifest_copy;
if (persisted_manifest.has_value()) {
manifest_copy = persisted_manifest->copy();
manifest_copy = persisted_manifest->share();
}
return lsm_state{
.domain_uuid = domain_uuid,
.seqno_delta = seqno_delta,
.db_epoch_delta = db_epoch_delta,
.volatile_buffer = copy_rows(volatile_buffer),
.volatile_buffer = share_rows(volatile_buffer),
.persisted_manifest = std::move(manifest_copy),
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_topics/level_one/metastore/lsm/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct lsm_state
volatile_buffer,
persisted_manifest);
}
lsm_state copy() const;
lsm_state share();

// Conversion between Redpanda space and LSM DB space.
model::term_id to_term(lsm::internal::database_epoch) const;
Expand Down Expand Up @@ -111,7 +111,7 @@ struct lsm_state
friend bool
operator==(const serialized_manifest&, const serialized_manifest&)
= default;
serialized_manifest copy() const;
serialized_manifest share();
lsm::sequence_number get_last_seqno() const { return last_seqno; }
lsm::internal::database_epoch get_database_epoch() const {
return database_epoch;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_topics/level_one/metastore/lsm/stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ ss::future<iobuf> stm::take_raft_snapshot() {
co_return std::move(snapshot_buf);
}

ss::future<lsm_stm_snapshot> stm::make_snapshot() const {
ss::future<lsm_stm_snapshot> stm::make_snapshot() {
lsm_stm_snapshot snapshot;
snapshot.state = state_.copy();
snapshot.state = state_.share();
co_return snapshot;
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_topics/level_one/metastore/lsm/stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class stm final : public metastore_stm_base {
model::term_id, model::record_batch batch, ss::abort_source&);

const lsm_state& state() const { return state_; }
lsm_state& mutable_state() { return state_; }

raft::stm_initial_recovery_policy
get_initial_recovery_policy() const final {
Expand All @@ -62,7 +63,7 @@ class stm final : public metastore_stm_base {
protected:
ss::future<> stop() override;

ss::future<lsm_stm_snapshot> make_snapshot() const;
ss::future<lsm_stm_snapshot> make_snapshot();

ss::future<> do_apply(const model::record_batch&) override;

Expand Down