diff --git a/src/v/cloud_topics/level_one/metastore/lsm/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/BUILD index 1e3728f164346..49a7e4d07f441 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/BUILD +++ b/src/v/cloud_topics/level_one/metastore/lsm/BUILD @@ -142,6 +142,67 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "replicated_db", + srcs = [ + "replicated_db.cc", + ], + hdrs = [ + "replicated_db.h", + ], + implementation_deps = [ + ":replicated_persistence", + "//src/v/cloud_topics:logger", + "//src/v/lsm/io:cloud_persistence", + "//src/v/model:batch_builder", + "//src/v/serde", + "//src/v/ssx:clock", + "//src/v/ssx:time", + ], + visibility = ["//visibility:public"], + deps = [ + ":lsm_update", + ":stm", + ":write_batch_row", + "//src/v/cloud_io:remote", + "//src/v/cloud_storage_clients", + "//src/v/container:chunked_vector", + "//src/v/lsm", + "//src/v/lsm/io:persistence", + "//src/v/lsm/proto:manifest_redpanda_proto", + "//src/v/model", + "@seastar", + ], +) + +redpanda_cc_library( + name = "replicated_persistence", + srcs = [ + "replicated_persistence.cc", + ], + hdrs = [ + "replicated_persistence.h", + ], + implementation_deps = [ + "//src/v/lsm", + "//src/v/lsm/io:cloud_persistence", + "//src/v/model:batch_builder", + "//src/v/serde", + ], + visibility = ["//visibility:public"], + deps = [ + ":lsm_update", + ":stm", + "//src/v/cloud_io:remote", + "//src/v/cloud_storage_clients", + "//src/v/lsm/core:exceptions", + "//src/v/lsm/io:persistence", + "//src/v/lsm/proto:manifest_redpanda_proto", + "//src/v/model", + "@seastar", + ], +) + redpanda_cc_library( name = "values", hdrs = [ diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc new file mode 100644 index 0000000000000..d81d9db2d3df2 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc @@ -0,0 +1,301 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_topics/level_one/metastore/lsm/replicated_db.h" + +#include "cloud_topics/level_one/metastore/lsm/lsm_update.h" +#include "cloud_topics/level_one/metastore/lsm/replicated_persistence.h" +#include "cloud_topics/level_one/metastore/lsm/stm.h" +#include "cloud_topics/logger.h" +#include "lsm/io/cloud_persistence.h" +#include "lsm/io/persistence.h" +#include "lsm/proto/manifest.proto.h" +#include "model/batch_builder.h" +#include "model/record.h" +#include "serde/rw/scalar.h" +#include "ssx/clock.h" + +#include + +namespace cloud_topics::l1 { + +namespace { +replicated_database::errc map_stm_error(stm::errc e) { + switch (e) { + case stm::errc::not_leader: + return replicated_database::errc::not_leader; + case stm::errc::raft_error: + return replicated_database::errc::replication_error; + case stm::errc::shutting_down: + return replicated_database::errc::shutting_down; + } +} +} // namespace + +ss::future, + replicated_database::errc>> +replicated_database::open( + stm* s, + const std::filesystem::path& staging_directory, + cloud_io::remote* remote, + const cloud_storage_clients::bucket_name& bucket, + ss::abort_source& as) { + auto term_result = co_await s->sync(std::chrono::seconds(30)); + if (!term_result.has_value()) { + co_return std::unexpected(map_stm_error(term_result.error())); + } + auto term = term_result.value(); + auto epoch = s->state().to_epoch(term); + + vlog( + cd_log.info, + "Opening replicated LSM database for term {} with DB epoch {}", + term, + epoch); + if (s->state().domain_uuid().is_nil()) { + auto new_domain_uuid = domain_uuid(uuid_t::create()); + vlog( + cd_log.info, + "Replicating new domain UUID {} in term {}", + new_domain_uuid, + term); + auto update = set_domain_uuid_update::build( + s->state(), new_domain_uuid); + model::batch_builder builder; + builder.set_batch_type(model::record_batch_type::l1_stm); + builder.add_record( + {.key = serde::to_iobuf(lsm_update_key::set_domain_uuid), + .value = serde::to_iobuf(update.value())}); + auto batch = co_await std::move(builder).build(); + auto replicate_result = co_await s->replicate_and_wait( + term, std::move(batch), as); + + if (!replicate_result.has_value()) { + vlog( + cd_log.warn, + "Failed to replicate set_domain_uuid batch: {}", + int(replicate_result.error())); + co_return std::unexpected(map_stm_error(replicate_result.error())); + } + if (s->state().domain_uuid().is_nil()) { + co_return std::unexpected(errc::replication_error); + } + } + auto domain_uuid = s->state().domain_uuid; + cloud_storage_clients::object_key domain_prefix{ + fmt::format("{}", domain_uuid())}; + + auto data_persist = co_await lsm::io::open_cloud_data_persistence( + staging_directory, remote, bucket, domain_prefix); + auto meta_persist = co_await open_replicated_metadata_persistence( + s, remote, bucket, domain_uuid, domain_prefix); + lsm::io::persistence io{ + .data = std::move(data_persist), + .metadata = std::move(meta_persist), + }; + + // Open the LSM database using the persisted manifest from the STM. + auto db = co_await lsm::database::open( + lsm::options{ + .database_epoch = epoch(), + // TODO: tuning. + }, + std::move(io)); + + // Replay the writes in the volatile_buffer as writes to the database. + // These are writes that were replicated but not yet persisted to the + // manifest. + auto max_persisted_seqno = db.max_persisted_seqno(); + if (!s->state().volatile_buffer.empty()) { + vlog( + cd_log.info, + "Applying {} volatile writes to LSM database", + s->state().volatile_buffer.size()); + + auto wb = db.create_write_batch(); + size_t num_written = 0; + for (const auto& row : s->state().volatile_buffer) { + auto seqno = row.seqno; + if (seqno <= max_persisted_seqno) { + continue; + } + wb.put(row.row.key, row.row.value.copy(), seqno); + vlog( + cd_log.trace, + "Replaying at seqno: {}, key: {}", + seqno, + row.row.key); + ++num_written; + } + if (num_written > 0) { + auto write_fut = co_await ss::coroutine::as_future( + db.apply(std::move(wb))); + if (write_fut.failed()) { + auto ex = write_fut.get_exception(); + vlog(cd_log.error, "Failed to apply volatile writes: {}", ex); + co_return std::unexpected(errc::io_error); + } + } + } + co_return std::unique_ptr( + new replicated_database(term, domain_uuid, s, std::move(db), as)); +} + +ss::future> +replicated_database::close() { + auto fut = co_await ss::coroutine::as_future(db_.close()); + if (fut.failed()) { + auto ex = fut.get_exception(); + if (ssx::is_shutdown_exception(ex)) { + co_return std::unexpected(errc::shutting_down); + } + co_return std::unexpected(errc::io_error); + } + co_return std::expected{}; +} + +bool replicated_database::needs_reopen() const { + return !stm_->raft()->is_leader() || term_ != stm_->raft()->confirmed_term() + || get_domain_uuid() != expected_domain_uuid_; +} + +ss::future> +replicated_database::write(chunked_vector rows) { + if (rows.empty()) { + co_return std::expected{}; + } + + auto update = apply_write_batch_update::build( + stm_->state(), expected_domain_uuid_, std::move(rows)); + + if (!update.has_value()) { + vlog( + cd_log.warn, + "Failed to build write batch update: {}", + update.error()); + co_return std::unexpected(errc::replication_error); + } + + model::batch_builder builder; + builder.set_batch_type(model::record_batch_type::l1_stm); + builder.add_record( + {.key = serde::to_iobuf(lsm_update_key::apply_write_batch), + .value = serde::to_iobuf(update.value().copy())}); + auto batch = co_await std::move(builder).build(); + + auto replicate_result = co_await stm_->replicate_and_wait( + term_, std::move(batch), as_); + + if (!replicate_result.has_value()) { + vlog( + cd_log.warn, + "Failed to replicate write batch: {}", + int(replicate_result.error())); + co_return std::unexpected(map_stm_error(replicate_result.error())); + } + + auto wb = db_.create_write_batch(); + const auto seqno_delta = stm_->state().seqno_delta; + auto seqno = lsm::sequence_number(replicate_result.value()() + seqno_delta); + for (const auto& row : update.value().rows) { + vlog(cd_log.trace, "Applying at seqno: {}, key: {}", seqno, row.key); + if (row.value.empty()) { + wb.remove(row.key, seqno); + } else { + wb.put(row.key, row.value.copy(), seqno); + } + } + + auto write_fut = co_await ss::coroutine::as_future( + db_.apply(std::move(wb))); + if (write_fut.failed()) { + auto ex = write_fut.get_exception(); + vlog(cd_log.error, "Failed to write to LSM database: {}", ex); + co_return std::unexpected(errc::io_error); + } + + co_return std::expected{}; +} + +ss::future> +replicated_database::reset( + domain_uuid uuid, std::optional manifest) { + if (uuid == get_domain_uuid()) { + co_return std::expected{}; + } + std::optional serialized_man; + if (manifest) { + serialized_man = lsm_state::serialized_manifest{ + .buf = co_await manifest->to_proto(), + .last_seqno = lsm::sequence_number(manifest->get_last_seqno()), + .database_epoch = lsm::internal::database_epoch( + manifest->get_database_epoch()), + }; + } + auto update = reset_manifest_update::build( + stm_->state(), uuid, std::move(serialized_man)); + + if (!update.has_value()) { + vlog( + cd_log.warn, + "Failed to build reset_manifest update: {}", + update.error()); + co_return std::unexpected(errc::replication_error); + } + + model::batch_builder builder; + builder.set_batch_type(model::record_batch_type::l1_stm); + builder.add_record( + {.key = serde::to_iobuf(lsm_update_key::reset_manifest), + .value = serde::to_iobuf(std::move(update.value()))}); + auto batch = co_await std::move(builder).build(); + + auto replicate_result = co_await stm_->replicate_and_wait( + term_, std::move(batch), as_); + + if (!replicate_result.has_value()) { + vlog( + cd_log.warn, + "Failed to replicate manifest reset: {}", + int(replicate_result.error())); + co_return std::unexpected(map_stm_error(replicate_result.error())); + } + if (uuid != get_domain_uuid()) { + vlog( + cd_log.warn, + "Domain UUID doesn't match after replication: {} vs expected {}", + get_domain_uuid(), + uuid); + co_return std::unexpected(errc::replication_error); + } + + co_return std::expected{}; +} + +domain_uuid replicated_database::get_domain_uuid() const { + return stm_->state().domain_uuid; +} + +ss::future> +replicated_database::flush(std::optional timeout) { + auto deadline = timeout ? ssx::lowres_steady_clock().now() + + ssx::duration::from_chrono(*timeout) + : ssx::instant::infinite_future(); + auto flush_fut = co_await ss::coroutine::as_future(db_.flush(deadline)); + if (flush_fut.failed()) { + auto ex = flush_fut.get_exception(); + vlog(cd_log.error, "Failed to flush to LSM database: {}", ex); + co_return std::unexpected(errc::io_error); + } + co_return std::expected{}; +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.h b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.h new file mode 100644 index 0000000000000..1ee27211c6b70 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_db.h @@ -0,0 +1,141 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ +#pragma once + +#include "cloud_io/remote.h" +#include "cloud_storage_clients/types.h" +#include "cloud_topics/level_one/metastore/lsm/stm.h" +#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" +#include "container/chunked_vector.h" +#include "lsm/io/persistence.h" +#include "lsm/lsm.h" +#include "lsm/proto/manifest.proto.h" +#include "model/fundamental.h" + +#include +#include + +namespace cloud_topics::l1 { + +class stm; + +// Leader-only interface for interacting with a replicated LSM database. +// +// There is no concurrency control applied by this class at the row level. +// Callers are expected to coordinate (e.g. via locking) to ensure updates to +// the same rows are performed in the desired order. +class replicated_database { +public: + enum class errc { + io_error, + replication_error, + not_leader, + shutting_down, + }; + + // Opens a replicated database for the leader of this term. + // + // This method: + // 1. Syncs the STM to ensure we have the latest state, assuming this is + // the first time opening the database as leader in the current term + // 2. Creates cloud persistence for data and metadata + // 3. Opens the LSM database using the persisted manifest from the STM + // 4. Applies any volatile_buffer writes to the database, catching up the + // database to the state as of the end of the previous leadership + // + // It is expected that this is called by the leader once in a given term, + // before any LSM state updates are replicated in this term. + static ss::future, errc>> + open( + stm* s, + const std::filesystem::path& staging_directory, + cloud_io::remote* remote, + const cloud_storage_clients::bucket_name& bucket, + ss::abort_source& as); + + replicated_database(replicated_database&&) = default; + ~replicated_database() = default; + ss::future> close(); + bool needs_reopen() const; + + // Builds a write batch for the given rows, replicates it to the STM, and + // upon success, applies it to the local database. + // + // If a replication error is returned, it's possible that the replication + // call timed out but the write was still replicated to the STM. To ensure + // the update actually happened, callers should either retry, or step down + // as leader to ensure the next leader picks up any potentially timed out + // writes. + ss::future> + write(chunked_vector rows); + + // Resets the underlying STM to the given manifest. + // NOTE: once this is called, this database instance must not be used. + ss::future> + reset(domain_uuid, std::optional manifest); + + domain_uuid get_domain_uuid() const; + ss::future> + flush(std::optional = std::nullopt); + + lsm::database& db() { return db_; } + +private: + replicated_database( + model::term_id term, + domain_uuid domain_uuid, + stm* s, + lsm::database db, + ss::abort_source& as) + : term_(term) + , expected_domain_uuid_(domain_uuid) + , stm_(s) + , db_(std::move(db)) + , as_(as) {} + + // All replication happens with this term as the invariant. + const model::term_id term_; + + // The domain UUID that this database is managing. If this diverges from + // the domain UUID in the STM, this replicated_database is no longer + // usable and callers should open a new one. + const domain_uuid expected_domain_uuid_; + + // STM for replication and state access. + stm* stm_; + + // The underlying LSM database. + lsm::database db_; + + ss::abort_source& as_; +}; + +} // namespace cloud_topics::l1 + +template<> +struct fmt::formatter final + : fmt::formatter { + template + auto format( + const cloud_topics::l1::replicated_database::errc& k, + FormatContext& ctx) const { + switch (k) { + using enum cloud_topics::l1::replicated_database::errc; + case io_error: + return formatter::format("io_error", ctx); + case replication_error: + return formatter::format("replication_error", ctx); + case not_leader: + return formatter::format("not_leader", ctx); + case shutting_down: + return formatter::format("shutting_down", ctx); + } + } +}; diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc new file mode 100644 index 0000000000000..866b8fd316b19 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc @@ -0,0 +1,140 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cloud_topics/level_one/metastore/lsm/replicated_persistence.h" + +#include "cloud_topics/level_one/metastore/lsm/lsm_update.h" +#include "cloud_topics/level_one/metastore/lsm/stm.h" +#include "lsm/core/exceptions.h" +#include "lsm/io/cloud_persistence.h" +#include "lsm/io/persistence.h" +#include "lsm/lsm.h" +#include "lsm/proto/manifest.proto.h" +#include "model/batch_builder.h" + +namespace cloud_topics::l1 { + +namespace { + +class replicated_metadata_persistence : public lsm::io::metadata_persistence { +public: + replicated_metadata_persistence( + stm* stm, + domain_uuid domain_uuid, + std::unique_ptr cloud_persistence) + : _stm(stm) + , _expected_domain_uuid(domain_uuid) + , _cloud_persistence(std::move(cloud_persistence)) {} + + ss::future> + read_manifest(lsm::internal::database_epoch max_epoch) override { + _as.check(); + auto _ = _gate.hold(); + auto term_result = co_await _stm->sync(std::chrono::seconds(30), _as); + if (!term_result.has_value()) { + throw lsm::io_error_exception( + "Failed to sync when loading manifest"); + } + if (_stm->state().domain_uuid != _expected_domain_uuid) { + // Caller needs to rebuild the metadata persistence with the + // appropriate prefix. + throw lsm::io_error_exception( + "Domain UUID has changed, likely due to domain recovery, " + "expected {}, got {}", + _expected_domain_uuid, + _stm->state().domain_uuid); + } + auto term = term_result.value(); + auto stm_epoch = _stm->state().to_epoch(term); + if (stm_epoch > max_epoch) { + // Callers are expected to have opened the database with an epoch + // at or higher than what is in the STM. + throw lsm::io_error_exception( + "Can't load manifest above current epoch {}, STM epoch: {}", + max_epoch(), + stm_epoch); + } + if (!_stm->state().persisted_manifest.has_value()) { + // There is no persisted manifest. + co_return std::nullopt; + } + co_return _stm->state().persisted_manifest->buf.copy(); + } + + ss::future<> + write_manifest(lsm::internal::database_epoch epoch, iobuf b) override { + auto h = _gate.hold(); + co_await _cloud_persistence->write_manifest(epoch, b.share()); + + // Now that the manifest has be persisted successfully, replicate to + // the log. + auto m = co_await lsm::proto::manifest::from_proto(b.share()); + auto serialized_man = lsm_state::serialized_manifest{ + .buf = std::move(b), + .last_seqno = lsm::sequence_number(m.get_last_seqno()), + .database_epoch = lsm::internal::database_epoch( + m.get_database_epoch()), + }; + auto update_res = persist_manifest_update::build( + _stm->state(), _expected_domain_uuid, std::move(serialized_man)); + if (!update_res.has_value()) { + throw lsm::io_error_exception( + "Invalid persist_manifest update: {}", update_res.error()); + } + model::batch_builder builder; + builder.set_batch_type(model::record_batch_type::l1_stm); + builder.add_record( + {.key = serde::to_iobuf(lsm_update_key::persist_manifest), + .value = serde::to_iobuf(std::move(update_res.value()))}); + auto batch = co_await std::move(builder).build(); + + auto replicate_result = co_await _stm->replicate_and_wait( + _stm->state().to_term(epoch), std::move(batch), _as); + + if (!replicate_result.has_value()) { + throw lsm::io_error_exception( + "Replication error after persisting manifest: {}", + int(replicate_result.error())); + } + } + + ss::future<> close() override { + _as.request_abort(); + auto fut = _gate.close(); + auto persistence_fut = _cloud_persistence->close(); + co_await std::move(persistence_fut); + co_await std::move(fut); + } + +private: + ss::gate _gate; + ss::abort_source _as; + stm* _stm; + domain_uuid _expected_domain_uuid; + std::unique_ptr _cloud_persistence; +}; + +} // namespace + +ss::future> +open_replicated_metadata_persistence( + stm* stm, + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + domain_uuid domain_uuid, + const cloud_storage_clients::object_key& prefix) { + auto cloud_persistence = co_await lsm::io::open_cloud_metadata_persistence( + remote, bucket, prefix); + co_return std::make_unique( + stm, domain_uuid, std::move(cloud_persistence)); +} + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.h b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.h new file mode 100644 index 0000000000000..5f6ff9a688041 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.h @@ -0,0 +1,43 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cloud_io/remote.h" +#include "cloud_storage_clients/types.h" +#include "cloud_topics/level_one/metastore/lsm/stm.h" +#include "lsm/io/persistence.h" +#include "model/fundamental.h" + +namespace cloud_topics::l1 { + +// Open a metadata persistence object in the given bucket at the prefix. In +// addition to writing to cloud, this metadata persistence replicates the +// manifest via the replicated state machine. +// +// The caller of this is expected to be the current leader of the Raft term, +// and will return an error if the manifest to be returned from the state +// machine corresponds to a higher term (implying that the caller is no longer +// leader). +// +// The input domain UUID and object storage prefix are expected to correspond +// to one another. If the state machine points to a different domain (e.g. +// after recovery), this metadata persistence will subsequently return errors, +// with the expectation that it will be closed and reopened. +ss::future> +open_replicated_metadata_persistence( + stm* stm, + cloud_io::remote* remote, + cloud_storage_clients::bucket_name bucket, + domain_uuid domain_uuid, + const cloud_storage_clients::object_key& prefix); + +} // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.cc b/src/v/cloud_topics/level_one/metastore/lsm/state.cc index ea508ac7e2dbc..16b874295608f 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/state.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.cc @@ -57,4 +57,20 @@ lsm_state lsm_state::copy() const { }; } +model::term_id lsm_state::to_term(lsm::internal::database_epoch e) const { + return model::term_id(e() - db_epoch_delta); +} + +lsm::internal::database_epoch lsm_state::to_epoch(model::term_id t) const { + return lsm::internal::database_epoch(t() + db_epoch_delta); +} + +kafka::offset lsm_state::to_offset(lsm::sequence_number s) const { + return kafka::offset(s() - seqno_delta); +} + +lsm::sequence_number lsm_state::to_seqno(kafka::offset o) const { + return lsm::sequence_number(o() + seqno_delta); +} + } // namespace cloud_topics::l1 diff --git a/src/v/cloud_topics/level_one/metastore/lsm/state.h b/src/v/cloud_topics/level_one/metastore/lsm/state.h index e64335715b3f2..1f3fe7bdbf6da 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/state.h +++ b/src/v/cloud_topics/level_one/metastore/lsm/state.h @@ -13,6 +13,7 @@ #include "cloud_topics/level_one/metastore/domain_uuid.h" #include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" #include "lsm/lsm.h" +#include "model/fundamental.h" #include "serde/envelope.h" #include @@ -58,6 +59,12 @@ struct lsm_state } lsm_state copy() const; + // Conversion between Redpanda space and LSM DB space. + model::term_id to_term(lsm::internal::database_epoch) const; + lsm::internal::database_epoch to_epoch(model::term_id) const; + kafka::offset to_offset(lsm::sequence_number) const; + lsm::sequence_number to_seqno(kafka::offset) const; + // The unique identifier for this LSM state. This should be used as the // basis for where the database writes its data and metadata. // diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.cc b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc index 81ad94fbde6f2..c8b0f0cd33c20 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/stm.cc +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.cc @@ -56,10 +56,11 @@ stm::stm( snapshot_timer_.set_callback([this] { write_snapshot_async(); }); } -ss::future> -stm::sync(model::timeout_clock::duration timeout) { +ss::future> stm::sync( + model::timeout_clock::duration timeout, + std::optional> as) { auto sync_res = co_await ss::coroutine::as_future( - metastore_stm_base::sync(timeout)); + metastore_stm_base::sync(timeout, as)); if (sync_res.failed()) { auto eptr = sync_res.get_exception(); auto msg = fmt::format("Exception caught while syncing: {}", eptr); diff --git a/src/v/cloud_topics/level_one/metastore/lsm/stm.h b/src/v/cloud_topics/level_one/metastore/lsm/stm.h index 963f36ac15b9b..8ae4920080fe4 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/stm.h +++ b/src/v/cloud_topics/level_one/metastore/lsm/stm.h @@ -41,8 +41,10 @@ class stm final : public metastore_stm_base { // leaders that they are up-to-date. // // Returns the current term. - ss::future> - sync(model::timeout_clock::duration timeout); + ss::future> sync( + model::timeout_clock::duration timeout, + std::optional> as + = std::nullopt); // Replicates the given batch and waits for it to finish replicating. // Success here does not guarantee that the replicated operation succeeded diff --git a/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD b/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD index 793cdfdaeab06..a266502e6938f 100644 --- a/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD +++ b/src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD @@ -54,3 +54,33 @@ redpanda_cc_gtest( "@seastar", ], ) + +redpanda_cc_gtest( + name = "replicated_db_test", + timeout = "moderate", + srcs = [ + "replicated_db_test.cc", + ], + cpu = 1, + deps = [ + "//src/v/cloud_io:remote", + "//src/v/cloud_io/tests:s3_imposter", + "//src/v/cloud_io/tests:scoped_remote", + "//src/v/cloud_storage_clients", + "//src/v/cloud_topics/level_one/metastore/lsm:replicated_db", + "//src/v/cloud_topics/level_one/metastore/lsm:stm", + "//src/v/cloud_topics/level_one/metastore/lsm:write_batch_row", + "//src/v/config", + "//src/v/container:chunked_vector", + "//src/v/lsm/io:cloud_persistence", + "//src/v/lsm/io:persistence", + "//src/v/model", + "//src/v/raft/tests:raft_fixture", + "//src/v/random:generators", + "//src/v/test_utils:gtest", + "//src/v/test_utils:scoped_config", + "//src/v/test_utils:tmp_dir", + "@googletest//:gtest", + "@seastar", + ], +) diff --git a/src/v/cloud_topics/level_one/metastore/lsm/tests/replicated_db_test.cc b/src/v/cloud_topics/level_one/metastore/lsm/tests/replicated_db_test.cc new file mode 100644 index 0000000000000..c8b2742baf9a2 --- /dev/null +++ b/src/v/cloud_topics/level_one/metastore/lsm/tests/replicated_db_test.cc @@ -0,0 +1,658 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cloud_io/remote.h" +#include "cloud_io/tests/s3_imposter.h" +#include "cloud_io/tests/scoped_remote.h" +#include "cloud_storage_clients/types.h" +#include "cloud_topics/level_one/metastore/lsm/replicated_db.h" +#include "cloud_topics/level_one/metastore/lsm/stm.h" +#include "cloud_topics/level_one/metastore/lsm/write_batch_row.h" +#include "config/node_config.h" +#include "gmock/gmock.h" +#include "lsm/io/cloud_persistence.h" +#include "lsm/io/persistence.h" +#include "model/fundamental.h" +#include "raft/tests/raft_fixture.h" +#include "random/generators.h" +#include "test_utils/async.h" +#include "test_utils/scoped_config.h" +#include "test_utils/tmp_dir.h" + +#include + +#include + +#include + +using namespace cloud_topics::l1; +using testing::ElementsAre; + +namespace { + +ss::logger rdb_test_log("replicated_db_test"); + +MATCHER_P2(MatchesKV, key_str, val_str, "") { + return arg.key == key_str && arg.value == iobuf::from(val_str); +} + +MATCHER_P2(MatchesRow, key_str, val_str, "") { + return arg.row.key == key_str && arg.row.value == iobuf::from(val_str); +} + +ss::future> get_rows(lsm::database& db) { + chunked_vector ret; + auto it = co_await db.create_iterator(); + co_await it.seek_to_first(); + while (it.valid()) { + ret.emplace_back( + write_batch_row{ + .key = ss::sstring(it.key()), + .value = it.value(), + }); + co_await it.next(); + } + co_return ret; +} + +struct replicated_db_node { + replicated_db_node( + ss::shared_ptr s, + cloud_io::remote* remote, + const cloud_storage_clients::bucket_name& bucket, + const ss::sstring& staging_path) + : stm_ptr(std::move(s)) + , remote(remote) + , bucket(bucket) + , staging_directory(staging_path.data()) {} + + ss::future> + open_db() { + auto ret = co_await replicated_database::open( + stm_ptr.get(), staging_directory.get_path(), remote, bucket, as); + if (!ret.has_value()) { + co_return std::unexpected(ret.error()); + } + auto ptr = ret.value().get(); + dbs.push_back(std::move(ret.value())); + co_return ptr; + } + + ss::future<> close() { + for (auto& db : dbs) { + auto res = co_await db->close(); + if (!res.has_value()) { + vlog( + rdb_test_log.warn, + "Failed to close DB for node {}", + stm_ptr->raft()->self()); + } + } + } + + ss::shared_ptr stm_ptr; + cloud_io::remote* remote; + const cloud_storage_clients::bucket_name& bucket; + temporary_dir staging_directory; + ss::abort_source as; + std::list> dbs; +}; + +} // namespace + +class ReplicatedDatabaseTest + : public raft::raft_fixture + , public s3_imposter_fixture { +public: + static constexpr auto num_nodes = 3; + using opt_ref = std::optional>; + + void SetUp() override { + ss::smp::invoke_on_all([] { + config::node().node_id.set_value(model::node_id{1}); + }).get(); + cfg.get("raft_heartbeat_interval_ms").set_value(50ms); + cfg.get("raft_heartbeat_timeout_ms").set_value(500ms); + + set_expectations_and_listen({}); + sr = cloud_io::scoped_remote::create(10, conf); + + raft::raft_fixture::SetUpAsync().get(); + + // Create our STMs. + for (auto i = 0; i < num_nodes; ++i) { + add_node(model::node_id(i), model::revision_id(0)); + } + for (auto& [id, node] : nodes()) { + node->initialise(all_vnodes()).get(); + auto* raft = node->raft().get(); + raft::state_machine_manager_builder builder; + auto s = builder.create_stm( + rdb_test_log, + raft, + config::mock_binding(1s)); + + node->start(std::move(builder)).get(); + + // Create staging directory for this node. + auto staging_path = fmt::format("replicated_db_test_{}", id()); + db_nodes.at(id()) = std::make_unique( + std::move(s), &sr->remote.local(), bucket_name, staging_path); + } + opt_ref leader; + ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader).get()); + initial_leader = &leader->get(); + + auto db_res = initial_leader->open_db().get(); + ASSERT_TRUE(db_res.has_value()); + initial_db = db_res.value(); + } + + void TearDown() override { + for (auto& node : db_nodes) { + if (node) { + node->close().get(); + } + } + raft::raft_fixture::TearDownAsync().get(); + sr.reset(); + } + + // Returns the node on the current leader. + opt_ref leader_node() { + auto leader_id = get_leader(); + if (!leader_id.has_value()) { + return std::nullopt; + } + auto& node = *db_nodes.at(leader_id.value()()); + if (!node.stm_ptr->raft()->is_leader()) { + return std::nullopt; + } + return node; + } + + // Waits for a leader to be elected, and returns it. + ss::future<> wait_for_leader(opt_ref& leader) { + RPTEST_REQUIRE_EVENTUALLY_CORO(10s, [&] { + leader = leader_node(); + return leader.has_value(); + }); + } + + // Waits for all nodes to have applied the current committed offset. + ss::future<> wait_for_apply() { + model::offset committed_offset{}; + for (auto& n : nodes()) { + committed_offset = std::max( + committed_offset, n.second->raft()->committed_offset()); + } + + co_await parallel_for_each_node([committed_offset](auto& node) { + return node.raft()->stm_manager()->wait( + committed_offset, model::no_timeout); + }); + } + + // Creates a manifest in the given epoch and domain with the given rows. + lsm::proto::manifest create_manifest( + uint64_t db_epoch, + domain_uuid domain_uuid, + chunked_vector rows) { + auto domain_prefix = cloud_storage_clients::object_key{ + fmt::format("{}", domain_uuid)}; + temporary_dir tmp("lsm_staging_scratch"); + auto cloud_db + = lsm::database::open( + {.database_epoch = db_epoch}, + lsm::io::persistence{ + .data = lsm::io::open_cloud_data_persistence( + tmp.get_path(), + &sr->remote.local(), + bucket_name, + domain_prefix) + .get(), + .metadata = lsm::io::open_cloud_metadata_persistence( + &sr->remote.local(), bucket_name, domain_prefix) + .get()}) + .get(); + + // Write some data and then flush to a new manifest. + auto wb = cloud_db.create_write_batch(); + for (auto& r : rows) { + wb.put(r.row.key, std::move(r.row.value), r.seqno); + } + cloud_db.apply(std::move(wb)).get(); + cloud_db.flush(ssx::instant::infinite_future()).get(); + cloud_db.close().get(); + auto cloud_meta_persistence = lsm::io::open_cloud_metadata_persistence( + &sr->remote.local(), + bucket_name, + domain_prefix) + .get(); + auto cloud_buf = cloud_meta_persistence + ->read_manifest(lsm::internal::database_epoch::max()) + .get(); + std::optional manifest; + if (cloud_buf) { + manifest + = lsm::proto::manifest::from_proto(std::move(*cloud_buf)).get(); + } + EXPECT_TRUE(manifest.has_value()); + return std::move(*manifest); + } + + std::array, num_nodes> db_nodes; + scoped_config cfg; + std::unique_ptr sr; + + // Initial leader and a database opened on that leader. + replicated_db_node* initial_leader; + replicated_database* initial_db; +}; + +// Test that if we're able to open the database, we will have already assigned +// a domain UUID and it matches on all nodes. +TEST_F(ReplicatedDatabaseTest, TestMatchingDomainUUID) { + wait_for_apply().get(); + auto domain_uuid = initial_db->get_domain_uuid(); + for (const auto& node : db_nodes) { + EXPECT_EQ(node->stm_ptr->state().domain_uuid, domain_uuid); + } +} + +// Basic test for writing. Once we write, we can read the rows back. Once we +// are no longer leader, we can no longer write. The new leader should be able +// to read the rows from the previous term. +TEST_F(ReplicatedDatabaseTest, TestBasicWrites) { + // Write some initial data. + chunked_vector rows1; + rows1.emplace_back( + write_batch_row{ + .key = "key1", + .value = iobuf::from("value1"), + }); + auto write_res = initial_db->write(std::move(rows1)).get(); + ASSERT_TRUE(write_res.has_value()); + wait_for_apply().get(); + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("key1", "value1"))); + + // Elect a new leader. + initial_leader->stm_ptr->raft()->step_down("test").get(); + opt_ref leader_opt; + ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader_opt).get()); + auto& new_leader = leader_opt->get(); + + // Open database on the new leader. + auto new_db_result = new_leader.open_db().get(); + ASSERT_TRUE(new_db_result.has_value()); + auto& new_db = *new_db_result.value(); + + // We should see the existing rows... + EXPECT_THAT( + get_rows(new_db.db()).get(), ElementsAre(MatchesKV("key1", "value1"))); + + // ...and be able to write more. + chunked_vector rows2; + rows2.emplace_back( + write_batch_row{ + .key = "key2", + .value = iobuf::from("value2"), + }); + write_res = new_db.write(std::move(rows2)).get(); + ASSERT_TRUE(write_res.has_value()); + EXPECT_THAT( + get_rows(new_db.db()).get(), + ElementsAre(MatchesKV("key1", "value1"), MatchesKV("key2", "value2"))); + + // Validate that all nodes see the same state. + wait_for_apply().get(); + for (const auto& node : db_nodes) { + EXPECT_THAT( + node->stm_ptr->state().volatile_buffer, + ElementsAre( + MatchesRow("key1", "value1"), MatchesRow("key2", "value2"))); + } +} + +// Basic test for flushing. Once we flush, we can still read the rows back, but +// shouldn't see these as volatile rows in the STM. This should be true for +// subsequent leaders. +TEST_F(ReplicatedDatabaseTest, TestBasicFlush) { + // Write some initial data. + chunked_vector rows1; + rows1.emplace_back( + write_batch_row{ + .key = "key1", + .value = iobuf::from("value1"), + }); + auto write_res = initial_db->write(std::move(rows1)).get(); + ASSERT_TRUE(write_res.has_value()); + wait_for_apply().get(); + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("key1", "value1"))); + + // Flush and validate that we have nothing in the resulting buffer. + auto flush_res = initial_db->flush().get(); + ASSERT_TRUE(flush_res.has_value()); + + // The rows should be visible through the database, and shouldn't exist in + // the volatile buffer since they've been flushed. + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("key1", "value1"))); + wait_for_apply().get(); + for (const auto& node : db_nodes) { + EXPECT_THAT(node->stm_ptr->state().volatile_buffer, ElementsAre()); + } + + // Write some more. + chunked_vector rows2; + rows2.emplace_back( + write_batch_row{ + .key = "key2", + .value = iobuf::from("value2"), + }); + write_res = initial_db->write(std::move(rows2)).get(); + ASSERT_TRUE(write_res.has_value()); + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("key1", "value1"), MatchesKV("key2", "value2"))); + + // The volatile buffer only includes the new rows on all replicas. + wait_for_apply().get(); + for (const auto& node : db_nodes) { + EXPECT_THAT( + node->stm_ptr->state().volatile_buffer, + ElementsAre(MatchesRow("key2", "value2"))); + } + + // Opening the database on a new leader should yield all the rows. + initial_leader->stm_ptr->raft()->step_down("test").get(); + opt_ref leader_opt; + ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader_opt).get()); + auto& new_leader = leader_opt->get(); + auto db_res = new_leader.open_db().get(); + ASSERT_TRUE(db_res.has_value()); + auto& new_db = *db_res.value(); + EXPECT_THAT( + get_rows(new_db.db()).get(), + ElementsAre(MatchesKV("key1", "value1"), MatchesKV("key2", "value2"))); +} + +// A flush with nothing in the db is a no-op. +TEST_F(ReplicatedDatabaseTest, TestEmptyFlush) { + auto flush_res = initial_db->flush().get(); + EXPECT_TRUE(flush_res.has_value()); + EXPECT_FALSE( + initial_leader->stm_ptr->state().persisted_manifest.has_value()); +} + +// Test that we can reset the manifest from an input manifest. +TEST_F(ReplicatedDatabaseTest, TestReset) { + // Get the initial domain UUID + auto initial_uuid = initial_db->get_domain_uuid(); + ASSERT_FALSE(initial_uuid().is_nil()); + + // Create a manifest under a new domain UUID. + auto new_uuid = domain_uuid(uuid_t::create()); + ASSERT_NE(initial_uuid, new_uuid); + + const auto new_seqno = lsm::sequence_number{12345}; + chunked_vector initial_rows; + initial_rows.emplace_back( + volatile_row{ + .seqno = new_seqno, + .row + = write_batch_row{.key = "key_before_reset", .value = iobuf::from("value_before_reset"),}, + }); + auto manifest = create_manifest(6789, new_uuid, std::move(initial_rows)); + + // Reset the STM from the manifest. + auto reset_res = initial_db->reset(new_uuid, std::move(manifest)).get(); + ASSERT_TRUE(reset_res.has_value()); + + // Verify the domain UUID was updated and that the data is what we expect. + ASSERT_EQ(initial_db->get_domain_uuid(), new_uuid); + ASSERT_EQ(initial_leader->stm_ptr->state().domain_uuid, new_uuid); + ASSERT_TRUE(initial_db->needs_reopen()); + + auto db_res = initial_leader->open_db().get(); + ASSERT_TRUE(db_res.has_value()); + auto& reopened_db = *db_res.value(); + EXPECT_EQ(new_seqno, reopened_db.db().max_persisted_seqno()); + EXPECT_EQ(new_seqno, reopened_db.db().max_applied_seqno()); + + EXPECT_THAT( + get_rows(reopened_db.db()).get(), + ElementsAre(MatchesKV("key_before_reset", "value_before_reset"))); + + ASSERT_GT(initial_leader->stm_ptr->state().seqno_delta, 0); + ASSERT_EQ(0, initial_leader->stm_ptr->state().volatile_buffer.size()); + + // Write more with the reopened database and validate again. + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = "key_after_reset", + .value = iobuf::from("value_after_reset"), + }); + auto write_result = reopened_db.write(std::move(rows)).get(); + ASSERT_TRUE(write_result.has_value()); + ASSERT_EQ(1, initial_leader->stm_ptr->state().volatile_buffer.size()); + EXPECT_THAT( + get_rows(reopened_db.db()).get(), + ElementsAre( + MatchesKV("key_after_reset", "value_after_reset"), + MatchesKV("key_before_reset", "value_before_reset"))); +} + +TEST_F(ReplicatedDatabaseTest, TestFlushFailure) { + // Write some rows. + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = "key1", + .value = iobuf::from("value1"), + }); + auto write_res = initial_db->write(std::move(rows)).get(); + ASSERT_TRUE(write_res.has_value()); + wait_for_apply().get(); + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("key1", "value1"))); + + // Step down as leader of the given term. + initial_leader->stm_ptr->raft()->step_down("test").get(); + opt_ref leader_opt; + ASSERT_NO_FATAL_FAILURE(wait_for_leader(leader_opt).get()); + + // Still try to flush with the same initial database. This should fail + // gracefully (no hangs, crashes, etc). + auto flush_res = initial_db->flush(1s).get(); + ASSERT_FALSE(flush_res.has_value()); + EXPECT_EQ(flush_res.error(), replicated_database::errc::io_error); +} + +TEST_F(ReplicatedDatabaseTest, TestResetWithEmptyManifest) { + auto initial_uuid = initial_db->get_domain_uuid(); + ASSERT_FALSE(initial_uuid().is_nil()); + + auto new_uuid = domain_uuid(uuid_t::create()); + ASSERT_NE(initial_uuid, new_uuid); + + // Reset from an empty manifest. + auto reset_result = initial_db->reset(new_uuid, std::nullopt).get(); + ASSERT_TRUE(reset_result.has_value()); + + // The domain UUID should still be updated. + ASSERT_EQ(initial_db->get_domain_uuid(), new_uuid); + ASSERT_EQ(initial_leader->stm_ptr->state().domain_uuid, new_uuid); + ASSERT_TRUE(initial_db->needs_reopen()); + + // Verify database is still functional - write and read data + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = "key_after_reset", + .value = iobuf::from("value_after_reset"), + }); + + // Writes should fail, and we should need to reopen the database. + auto write_res = initial_db->write(std::move(rows)).get(); + ASSERT_FALSE(write_res.has_value()); + ASSERT_EQ(0, initial_leader->stm_ptr->state().volatile_buffer.size()); + ASSERT_TRUE(initial_db->needs_reopen()); + + // Try again with the reopened database. + chunked_vector new_rows; + new_rows.emplace_back( + write_batch_row{ + .key = "key_after_reset", + .value = iobuf::from("value_after_reset"), + }); + auto db_res = initial_leader->open_db().get(); + ASSERT_TRUE(db_res.has_value()); + auto& reopened_db = *db_res.value(); + write_res = reopened_db.write(std::move(new_rows)).get(); + ASSERT_TRUE(write_res.has_value()); + ASSERT_EQ(1, initial_leader->stm_ptr->state().volatile_buffer.size()); + EXPECT_THAT( + get_rows(reopened_db.db()).get(), + ElementsAre(MatchesKV("key_after_reset", "value_after_reset"))); +} + +TEST_F(ReplicatedDatabaseTest, TestResetFailsNonEmpty) { + // Write a row. + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = "existing_key", + .value = iobuf::from("existing_value"), + }); + auto write_res = initial_db->write(std::move(rows)).get(); + ASSERT_TRUE(write_res.has_value()); + wait_for_apply().get(); + + // Create a manifest. + auto new_uuid = domain_uuid(uuid_t::create()); + ASSERT_NE(initial_db->get_domain_uuid(), new_uuid); + chunked_vector manifest_rows; + manifest_rows.emplace_back( + volatile_row{ + .seqno = lsm::sequence_number{100}, + .row + = write_batch_row{.key = "reset_key", .value = iobuf::from("reset_value"),}, + }); + auto manifest = create_manifest(999, new_uuid, std::move(manifest_rows)); + + // Attempt to reset with the manifest. This should fail since the STM is + // non-empty. + auto reset_res = initial_db->reset(new_uuid, std::move(manifest)).get(); + ASSERT_FALSE(reset_res.has_value()); + + // Now flush. + auto flush_res = initial_db->flush().get(); + ASSERT_TRUE(flush_res.has_value()); + wait_for_apply().get(); + + // Try reseting again. This should still fail. + chunked_vector new_rows; + new_rows.emplace_back( + volatile_row{ + .seqno = lsm::sequence_number{100}, + .row + = write_batch_row{.key = "reset_key", .value = iobuf::from("reset_value"),}, + }); + manifest = create_manifest(999, new_uuid, std::move(new_rows)); + reset_res = initial_db->reset(new_uuid, std::move(manifest)).get(); + ASSERT_FALSE(reset_res.has_value()); + + // Do a sanity check that the STM still has our expected row. + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre(MatchesKV("existing_key", "existing_value"))); +} + +TEST_F(ReplicatedDatabaseTest, TestConcurrentWrites) { + // Kick off writes in parallel. + constexpr int num_concurrent_writes = 5; + std::vector>> + write_futs; + write_futs.reserve(num_concurrent_writes); + for (int i = 0; i < num_concurrent_writes; ++i) { + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = fmt::format("key{}", i), + .value = iobuf::from(fmt::format("value{}", i)), + }); + write_futs.push_back(initial_db->write(std::move(rows))); + } + auto results + = ss::when_all_succeed(write_futs.begin(), write_futs.end()).get(); + + // Verify all writes succeeded. + for (const auto& result : results) { + ASSERT_TRUE(result.has_value()); + } + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre( + MatchesKV("key0", "value0"), + MatchesKV("key1", "value1"), + MatchesKV("key2", "value2"), + MatchesKV("key3", "value3"), + MatchesKV("key4", "value4"))); +} + +// Test that concurrent flush and write operations execute correctly. +TEST_F(ReplicatedDatabaseTest, TestConcurrentFlushAndWrite) { + // Start a fiber that is repeatedly calling flush until we stop it. + ss::abort_source flush_as; + auto flush_fut = ss::do_until( + [&flush_as] { return flush_as.abort_requested(); }, + [this] { + return initial_db->flush().then([](auto) { return ss::sleep(1ms); }); + }); + + // Call write a few times. + for (int i = 0; i < 10; ++i) { + chunked_vector rows; + rows.emplace_back( + write_batch_row{ + .key = fmt::format("key{}", i), + .value = iobuf::from(fmt::format("value{}", i)), + }); + auto write_res = initial_db->write(std::move(rows)).get(); + ASSERT_TRUE(write_res.has_value()); + } + + // Stop the flush fiber. + flush_as.request_abort(); + flush_fut.get(); + + // Validate the rows match what we expect. + EXPECT_THAT( + get_rows(initial_db->db()).get(), + ElementsAre( + MatchesKV("key0", "value0"), + MatchesKV("key1", "value1"), + MatchesKV("key2", "value2"), + MatchesKV("key3", "value3"), + MatchesKV("key4", "value4"), + MatchesKV("key5", "value5"), + MatchesKV("key6", "value6"), + MatchesKV("key7", "value7"), + MatchesKV("key8", "value8"), + MatchesKV("key9", "value9"))); +} diff --git a/src/v/lsm/BUILD b/src/v/lsm/BUILD index 116266daeae46..a602705178a55 100644 --- a/src/v/lsm/BUILD +++ b/src/v/lsm/BUILD @@ -20,6 +20,7 @@ redpanda_cc_library( deps = [ "//src/v/base", "//src/v/lsm/io:persistence", + "//src/v/ssx:time", "//src/v/utils:named_type", "@abseil-cpp//absl/time", "@seastar", diff --git a/src/v/lsm/db/BUILD b/src/v/lsm/db/BUILD index af0a51acf7d62..14327513b0d8e 100644 --- a/src/v/lsm/db/BUILD +++ b/src/v/lsm/db/BUILD @@ -77,7 +77,9 @@ redpanda_cc_library( "//src/v/lsm/core/internal:options", "//src/v/lsm/io:persistence", "//src/v/lsm/sst:block_cache", + "//src/v/ssx:clock", "//src/v/ssx:condition_variable", + "//src/v/ssx:time", "@seastar", ], ) diff --git a/src/v/lsm/db/impl.cc b/src/v/lsm/db/impl.cc index abee47930a556..592ac63bfc653 100644 --- a/src/v/lsm/db/impl.cc +++ b/src/v/lsm/db/impl.cc @@ -24,6 +24,7 @@ #include "lsm/io/persistence.h" #include "lsm/sst/block_cache.h" #include "lsm/sst/builder.h" +#include "ssx/clock.h" #include #include @@ -248,15 +249,23 @@ impl::create_internal_iterator() { co_return internal::create_merging_iterator(std::move(list)); } -ss::future<> impl::flush() { +ss::future<> impl::flush(ssx::instant deadline) { if (_opts->readonly) [[unlikely]] { throw invalid_argument_exception( "attempted to flush a readonly database"); } auto applied_seqno = max_applied_seqno(); while (applied_seqno > max_persisted_seqno()) { + if (ssx::lowres_steady_clock().now() > deadline) { + throw io_error_exception( + "failed to persist up to seqno {} in time: current persisted " + "seqno {}", + applied_seqno.value_or(internal::sequence_number(0)), + max_persisted_seqno().value_or(internal::sequence_number(0))); + } if (_imm) { - co_await _background_work_finished_signal.wait(_as); + co_await _background_work_finished_signal.wait( + deadline.to_chrono(), _as); } else if (!_mem->empty()) { _imm = std::exchange(_mem, ss::make_lw_shared()); maybe_schedule_compaction(); @@ -264,6 +273,10 @@ ss::future<> impl::flush() { } } +ss::future<> impl::flush() { + return impl::flush(ssx::instant::infinite_future()); +} + ss::future<> impl::close() { vlog(log.trace, "close_start"); _as.request_abort_ex(abort_requested_exception("database closing")); diff --git a/src/v/lsm/db/impl.h b/src/v/lsm/db/impl.h index 6bd56ec9a2481..5876bafdf7aad 100644 --- a/src/v/lsm/db/impl.h +++ b/src/v/lsm/db/impl.h @@ -23,6 +23,7 @@ #include "lsm/db/version_set.h" #include "lsm/io/persistence.h" #include "ssx/condition_variable.h" +#include "ssx/time.h" #include @@ -82,6 +83,9 @@ class impl { ss::optimized_optional> create_snapshot(); // Flush any pending state in memtables to disk. + ss::future<> flush(ssx::instant deadline); + + // Flush with no deadline. ss::future<> flush(); // Close the database, no more operations should happen to the database at diff --git a/src/v/lsm/db/tests/BUILD b/src/v/lsm/db/tests/BUILD index acd8914770a05..1719b9cf96bb5 100644 --- a/src/v/lsm/db/tests/BUILD +++ b/src/v/lsm/db/tests/BUILD @@ -173,6 +173,8 @@ redpanda_cc_gtest( "//src/v/lsm/io:memory_persistence", "//src/v/lsm/sst:builder", "//src/v/random:generators", + "//src/v/ssx:clock", + "//src/v/ssx:time", "//src/v/test_utils:gtest", "@googletest//:gtest", "@seastar", diff --git a/src/v/lsm/db/tests/impl_test.cc b/src/v/lsm/db/tests/impl_test.cc index 64214203bea94..699addb9f8f29 100644 --- a/src/v/lsm/db/tests/impl_test.cc +++ b/src/v/lsm/db/tests/impl_test.cc @@ -15,6 +15,7 @@ #include "lsm/db/impl.h" #include "lsm/io/memory_persistence.h" #include "random/generators.h" +#include "ssx/clock.h" #include "test_utils/async.h" #include @@ -131,7 +132,8 @@ class ImplTest : public testing::Test { .level_one_compaction_trigger = 2, }); _underlying_data_persistence = lsm::io::make_memory_data_persistence(); - _meta_persistence = lsm::io::make_memory_metadata_persistence(); + _meta_persistence = lsm::io::make_memory_metadata_persistence( + &_meta_persistence_controller); _tracking_data = std::make_unique( _underlying_data_persistence.get()); open(); @@ -250,6 +252,7 @@ class ImplTest : public testing::Test { shadow_map _shadow; ss::lw_shared_ptr _options; std::unique_ptr _underlying_data_persistence; + lsm::io::memory_persistence_controller _meta_persistence_controller; std::unique_ptr _meta_persistence; std::unique_ptr _tracking_data; std::unique_ptr _db; @@ -282,6 +285,19 @@ TEST_F(ImplTest, Recovery) { EXPECT_TRUE(matches_shadow()); } +TEST_F(ImplTest, FlushFailure) { + write_at_least(128_KiB); + EXPECT_TRUE(matches_shadow()); + tests::drain_task_queue().get(); + _meta_persistence_controller.should_fail = true; + EXPECT_ANY_THROW( + _db + ->flush( + ssx::lowres_steady_clock().now() + ssx::duration::milliseconds(100)) + .get()); + EXPECT_FALSE(max_persisted_seqno().has_value()); +} + TEST_F(ImplTest, Randomized) { #ifndef NDEBUG int rounds = 100; diff --git a/src/v/lsm/io/memory_persistence.cc b/src/v/lsm/io/memory_persistence.cc index b4bb1252a0ae6..2abbf96d18290 100644 --- a/src/v/lsm/io/memory_persistence.cc +++ b/src/v/lsm/io/memory_persistence.cc @@ -126,8 +126,14 @@ class memory_sequential_file_writer : public sequential_file_writer { class data_impl : public data_persistence { public: + explicit data_impl(memory_persistence_controller* controller) + : _controller(controller) {} + ss::future> open_random_access_reader(internal::file_handle h) override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } auto it = _data.find(h); std::unique_ptr ptr; if (it != _data.end()) { @@ -139,6 +145,9 @@ class data_impl : public data_persistence { ss::future> open_sequential_writer(internal::file_handle h) override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } auto it = _data.try_emplace( h, ss::make_lw_shared(h)); co_return std::make_unique( @@ -146,6 +155,9 @@ class data_impl : public data_persistence { } ss::future<> remove_file(internal::file_handle h) override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } auto it = _data.find(h); if (it == _data.end()) { co_return; @@ -159,6 +171,9 @@ class data_impl : public data_persistence { ss::coroutine::experimental::generator list_files() override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } auto it = _data.begin(); while (it != _data.end()) { auto key = it->first; @@ -183,13 +198,15 @@ class data_impl : public data_persistence { } private: + memory_persistence_controller* _controller; bool _closed = false; std::map> _data; }; class metadata_impl : public metadata_persistence { public: - explicit metadata_impl() = default; + explicit metadata_impl(memory_persistence_controller* controller) + : _controller(controller) {} metadata_impl(const metadata_impl&) = delete; metadata_impl(metadata_impl&&) = delete; metadata_impl& operator=(const metadata_impl&) = delete; @@ -199,6 +216,9 @@ class metadata_impl : public metadata_persistence { } ss::future> read_manifest(internal::database_epoch e) override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } if (e > _epoch) { co_return std::nullopt; } @@ -206,6 +226,9 @@ class metadata_impl : public metadata_persistence { } ss::future<> write_manifest(internal::database_epoch epoch, iobuf b) override { + if (_controller && _controller->should_fail) { + throw io_error_exception("injected error"); + } _epoch = epoch; _latest = std::move(b); co_return; @@ -217,6 +240,7 @@ class metadata_impl : public metadata_persistence { } private: + memory_persistence_controller* _controller; bool _closed = false; internal::database_epoch _epoch; std::optional _latest; @@ -224,12 +248,14 @@ class metadata_impl : public metadata_persistence { } // namespace -std::unique_ptr make_memory_data_persistence() { - return std::make_unique(); +std::unique_ptr +make_memory_data_persistence(memory_persistence_controller* controller) { + return std::make_unique(controller); } -std::unique_ptr make_memory_metadata_persistence() { - return std::make_unique(); +std::unique_ptr +make_memory_metadata_persistence(memory_persistence_controller* controller) { + return std::make_unique(controller); } } // namespace lsm::io diff --git a/src/v/lsm/io/memory_persistence.h b/src/v/lsm/io/memory_persistence.h index 6c807b1dfc137..3844f61f2c798 100644 --- a/src/v/lsm/io/memory_persistence.h +++ b/src/v/lsm/io/memory_persistence.h @@ -15,10 +15,16 @@ namespace lsm::io { +struct memory_persistence_controller { + bool should_fail{false}; +}; + // Create an in memory ephemeral data persistence layer for testing. -std::unique_ptr make_memory_data_persistence(); +std::unique_ptr +make_memory_data_persistence(memory_persistence_controller* = nullptr); // Create an in memory ephemeral metadata persistence layer for testing. -std::unique_ptr make_memory_metadata_persistence(); +std::unique_ptr +make_memory_metadata_persistence(memory_persistence_controller* = nullptr); } // namespace lsm::io diff --git a/src/v/lsm/lsm.cc b/src/v/lsm/lsm.cc index 891cafe44748b..b91e6e8c609c6 100644 --- a/src/v/lsm/lsm.cc +++ b/src/v/lsm/lsm.cc @@ -17,6 +17,7 @@ #include "lsm/core/internal/options.h" #include "lsm/db/impl.h" #include "lsm/db/memtable.h" +#include "ssx/time.h" #include @@ -179,7 +180,9 @@ std::optional database::max_applied_seqno() const { [](auto seqno) { return from_internal_seqno(seqno); }); } -ss::future<> database::flush() { return _impl->flush(); } +ss::future<> database::flush(ssx::instant deadline) { + return _impl->flush(deadline); +} ss::future<> database::apply(write_batch batch) { auto b = std::move(batch._batch); diff --git a/src/v/lsm/lsm.h b/src/v/lsm/lsm.h index e0c08dce030ac..3459fd435eace 100644 --- a/src/v/lsm/lsm.h +++ b/src/v/lsm/lsm.h @@ -14,6 +14,7 @@ #include "absl/time/time.h" #include "base/seastarx.h" #include "lsm/io/persistence.h" +#include "ssx/time.h" #include "utils/named_type.h" #include @@ -175,7 +176,7 @@ class database { // Flush existing buffered data such that that `max_persisted_offset()` // becomes >= the current `max_applied_offset()`. - ss::future<> flush(); + ss::future<> flush(ssx::instant deadline); // Apply a batch of data atomically to the database. //