diff --git a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc index 532d2802d5b9d..a9980fa866bcb 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc @@ -67,15 +67,15 @@ apply_write_batch_update::apply(lsm_state& state, model::offset base_offset) { return std::monostate{}; } -apply_write_batch_update apply_write_batch_update::copy() const { +apply_write_batch_update apply_write_batch_update::share() { apply_write_batch_update ret{ .expected_uuid = expected_uuid, }; - for (const auto& r : rows) { + for (auto& r : rows) { ret.rows.push_back( write_batch_row{ .key = r.key, - .value = r.value.copy(), + .value = r.value.share(), }); } return ret; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h index 6b2a7f8890ec8..0735d897e21b5 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h +++ b/src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h @@ -52,7 +52,7 @@ struct apply_write_batch_update std::expected apply(lsm_state&, model::offset); - apply_write_batch_update copy() const; + apply_write_batch_update share(); domain_uuid expected_uuid; chunked_vector rows; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc index d81d9db2d3df2..1c5d95e179cbb 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc @@ -188,7 +188,7 @@ replicated_database::write(chunked_vector rows) { builder.set_batch_type(model::record_batch_type::l1_stm); builder.add_record( {.key = serde::to_iobuf(lsm_update_key::apply_write_batch), - .value = serde::to_iobuf(update.value().copy())}); + .value = serde::to_iobuf(update.value().share())}); auto batch = co_await std::move(builder).build(); auto replicate_result = co_await stm_->replicate_and_wait( diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc index 866b8fd316b19..0e9f2b7284a56 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc @@ -66,7 +66,7 @@ class replicated_metadata_persistence : public lsm::io::metadata_persistence { // There is no persisted manifest. co_return std::nullopt; } - co_return _stm->state().persisted_manifest->buf.copy(); + co_return _stm->mutable_state().persisted_manifest->buf.share(); } ss::future<> diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.cc b/src/v/cloud_topics/level_one/metastore/lsm/state.cc index 16b874295608f..32bedf96421ac 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/state.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.cc @@ -22,37 +22,37 @@ namespace cloud_topics::l1 { namespace { -std::deque copy_rows(const std::deque& rows) { +std::deque share_rows(std::deque& rows) { std::deque copy; - for (const auto& r : rows) { + for (auto& r : rows) { copy.push_back( volatile_row{ .seqno = r.seqno, .row = write_batch_row{ - .key = r.row.key, .value = r.row.value.copy()}}); + .key = r.row.key, .value = r.row.value.share()}}); } return copy; } } // namespace -lsm_state::serialized_manifest lsm_state::serialized_manifest::copy() const { +lsm_state::serialized_manifest lsm_state::serialized_manifest::share() { return serialized_manifest{ - .buf = buf.copy(), + .buf = buf.share(), .last_seqno = last_seqno, .database_epoch = database_epoch, }; } -lsm_state lsm_state::copy() const { +lsm_state lsm_state::share() { std::optional manifest_copy; if (persisted_manifest.has_value()) { - manifest_copy = persisted_manifest->copy(); + manifest_copy = persisted_manifest->share(); } return lsm_state{ .domain_uuid = domain_uuid, .seqno_delta = seqno_delta, .db_epoch_delta = db_epoch_delta, - .volatile_buffer = copy_rows(volatile_buffer), + .volatile_buffer = share_rows(volatile_buffer), .persisted_manifest = std::move(manifest_copy), }; } diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.h b/src/v/cloud_topics/level_one/metastore/lsm/state.h index 1f3fe7bdbf6da..5b3104c8ead49 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/state.h +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.h @@ -57,7 +57,7 @@ struct lsm_state volatile_buffer, persisted_manifest); } - lsm_state copy() const; + lsm_state share(); // Conversion between Redpanda space and LSM DB space. model::term_id to_term(lsm::internal::database_epoch) const; @@ -111,7 +111,7 @@ struct lsm_state friend bool operator==(const serialized_manifest&, const serialized_manifest&) = default; - serialized_manifest copy() const; + serialized_manifest share(); lsm::sequence_number get_last_seqno() const { return last_seqno; } lsm::internal::database_epoch get_database_epoch() const { return database_epoch; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.cc b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc index c8b0f0cd33c20..cdb49d2e0ee8b 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/stm.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc @@ -198,9 +198,9 @@ ss::future stm::take_raft_snapshot() { co_return std::move(snapshot_buf); } -ss::future stm::make_snapshot() const { +ss::future stm::make_snapshot() { lsm_stm_snapshot snapshot; - snapshot.state = state_.copy(); + snapshot.state = state_.share(); co_return snapshot; } diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.h b/src/v/cloud_topics/level_one/metastore/lsm/stm.h index 8ae4920080fe4..4084dcc49422f 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/stm.h +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.h @@ -53,6 +53,7 @@ class stm final : public metastore_stm_base { model::term_id, model::record_batch batch, ss::abort_source&); const lsm_state& state() const { return state_; } + lsm_state& mutable_state() { return state_; } raft::stm_initial_recovery_policy get_initial_recovery_policy() const final { @@ -62,7 +63,7 @@ class stm final : public metastore_stm_base { protected: ss::future<> stop() override; - ss::future make_snapshot() const; + ss::future make_snapshot(); ss::future<> do_apply(const model::record_batch&) override;