-
Notifications
You must be signed in to change notification settings - Fork 709
l1: state machine for replicated LSM #29160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a state machine (stm) for managing a replicated LSM database backed by cloud storage. The STM manages a database manifest and a volatile write buffer (WAL) that together represent the current database state. The core logic enables recovery from cloud storage by allowing manifest resets while maintaining monotonically increasing epochs and sequence numbers.
Key changes include:
- Implementation of four update types for the STM: write batch application, manifest persistence, domain UUID setting, and manifest reset
- State management including volatile buffer, persisted manifest, and offset/epoch deltas for recovery scenarios
- Comprehensive test coverage for all update operations and edge cases
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/v/cloud_topics/level_one/metastore/domain_uuid.h |
Defines domain_uuid type alias for database identification |
src/v/cloud_topics/level_one/metastore/BUILD |
Adds build target for domain_uuid library |
src/v/cloud_topics/level_one/metastore/lsm/write_batch_row.h |
Defines write_batch_row structure for key-value pairs |
src/v/cloud_topics/level_one/metastore/lsm/state.h |
Defines lsm_state and related structures for STM state management |
src/v/cloud_topics/level_one/metastore/lsm/state.cc |
Implements state copy operations |
src/v/cloud_topics/level_one/metastore/lsm/lsm_update.h |
Declares four update types and their interfaces |
src/v/cloud_topics/level_one/metastore/lsm/lsm_update.cc |
Implements validation and application logic for all update types |
src/v/cloud_topics/level_one/metastore/lsm/stm.h |
Declares the replicated state machine interface |
src/v/cloud_topics/level_one/metastore/lsm/stm.cc |
Implements STM with replication, snapshotting, and update application |
src/v/cloud_topics/level_one/metastore/lsm/BUILD |
Defines build targets for all LSM components |
src/v/cloud_topics/level_one/metastore/lsm/tests/lsm_update_test.cc |
Comprehensive unit tests for all update operations |
src/v/cloud_topics/level_one/metastore/lsm/tests/BUILD |
Build configuration for test targets |
This will be used to uniquely identify a given L1 metastore domain. The expectation is that all SSTs and manifests for a given domain will be prefixed by domain_uuid to avoid collisions.
Adds initial STM state that will back a replicated lsm::database. See comments in state.h for more details.
Adds state updates that will drive updates to the LSM that will back the metastore. The operations are: - Set domain UUID: this is meant to be the first operation replicated by the STM. All further requests are predicated on the domain UUID being correct, and this domain UUID uniquely identifies the underlying database (e.g. it changes when a domain is restored). - Reset manifest: this is used to restore a manifest from cloud, and reset the database to the state persisted in cloud. - Write write_batch: these batches contain key-value pairs to be updated to the underlying database. When applied these are _not_ applied to the database and are rather kept in memory so upon becoming leader, a new leader is able to apply the key-value pairs. - Persist manifest: this is used by the replicated LSM metadata layer to store the database manifest. Once applied, subsequent leaders will open the database from this manifest and then apply any pending write batches.
Introduces a new STM that manages an instance of the newly introduced lsm_state. This is meant to be used along with object storage to manage a single lsm::database across replicas. The code here is pretty much all boilerplate copied from other instances of STMs (e.g. metastore/simple_stm). An upcoming change will introduce a new lsm::metadata_persistence that will replicate LSM manifests using this newly introduced STM so that lsm::databases can be opened from this STM on leaders.
| // Rows that aren't persisted to object storage yet but should be applied | ||
| // to the database. When opening a database from the persisted manifest, | ||
| // these rows must be applied to the opened database to catch it up. | ||
| std::deque<volatile_row> volatile_buffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: having the volatile buffer be in memory isn't a hard requirement. Another implementation would be to have leaders read the Raft log to replay the write batches, and have the readers filter out at replay time the rows that don't belong to the current domain. I implemented it with it explicitly being in memory because I found it easier to reason about correctness, but it shouldn't be too difficult to remove (just a matter of correctly accounting for the max removable local log offset)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this double the write buffer memory usage or will we share out of these bytes?
Should this be a chunked fifo instead of a deque
| state.volatile_buffer.push_back( | ||
| volatile_row{.seqno = seqno, .row = std::move(row)}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we apply manifest update we pop off seq no it looks like there is an implied monotonic order. but do we enforce that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted on slack, it's somewhat implied by virtue of us calling apply() with the Raft log offset of the update
| auto update = serde::read<apply_write_batch_update>(value_parser); | ||
| auto result = update.apply(state_, o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is volatile rows consumed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted on slack, it's in the next PR in replicated_database
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might fail if the seq.number is incorrect. How the caller (the code that replicated the batch) will know that this happened?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be fair to make this a vassert. This is an internal invariant maintained by the state machine by virtue of being called with monotonically increasing offsets
| } | ||
| lsm::sequence_number seqno(base_offset() + state.seqno_delta); | ||
| for (auto& row : rows) { | ||
| state.volatile_buffer.push_back( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in the ctp stm there is a lot of concerns with idempotency. I think this is only a concern with local snapshots or something? Anyways, is that either not applicable or how do we not hit that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idempotency and general concurrency is left up to callers.
My thinking is that it will look like:
- open a given database in term T
- take a lock to ensure only one fiber is replicating at a time (for the sake of example, this could be a lock on the database, but could be finer grained, like a row lock)
- check invariants of the database while the lock is held
- replicate the writes in term T
- if there is a timeout, retry replication without dropping the lock to ensure the writes make it through (or step down as leader)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean in terms of the snapshots and log replays. I think the answer is that we don't support snapshots at offsets, so we don't have to worry about it? The ctp stm does but because the updates are applied idempotently it's okay to replay old state.
I think in this case snapshots are going to require us to re-open the database to discard writes that might have happened while it was an old leader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case snapshots are going to require us to re-open the database to discard writes that might have happened while it was an old leader
Ah, I think I understand what you're asking. You're right, this STM doesn't support snapshots at offsets. And yes, when we become leader we'll need to sync the STM and reopen the database to ensure we don't miss out on writes from previous terms.
|
|
||
| ss::future<lsm_stm_snapshot> stm::make_snapshot() const { | ||
| lsm_stm_snapshot snapshot; | ||
| snapshot.state = state_.copy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there ever a concern for stalls here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question -- certainly at some point, but I'm not sure. I will need to measure a bit.
|
|
||
| fmt::iterator format_to(fmt::iterator it) const { | ||
| return fmt::format_to( | ||
| it, "{{key: {}, value: {}}}", key, value.linearize_to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe limit the value length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
linearize_to_string() will throw if the value is over 128KiB. Also worth noting (and maybe I'll add a comment) this is only used in tests
| // | ||
| // TODO: use named types to enforce correct arithmetic rules. | ||
| int64_t seqno_delta{0}; | ||
| int64_t db_epoch_delta{0}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it simpler to seed the start offset and term with correct values during the recovery?
With delta's most of the code will always run and will mostly be tested with both deltas set to zero. If one of these values is not taken into account it'll be easy for the bug to be unnoticed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's simpler because that means we need to know those values at the time of partition creation. Fair concern about testing mostly with zero -- I'll give it some thought if there are ways to make this more bulletproof, though my hope is that this STM is kept fairly simple and doesn't gain too much more complexity around seqnos and epochs..
| volatile_row{ | ||
| .seqno = r.seqno, | ||
| .row = write_batch_row{ | ||
| .key = r.row.key, .value = r.row.value.copy()}}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
share?
| ret.rows.push_back( | ||
| write_batch_row{ | ||
| .key = r.key, | ||
| .value = r.value.copy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
share?
| // The deltas ensure that future raft offsets map correctly to database | ||
| // seqnos/epochs. | ||
| state.seqno_delta = persisted_seqno - o(); | ||
| state.db_epoch_delta = persisted_epoch - t(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to add an assertion or logging for the case if current offset or current term is greater than persisted one. Otherwise it'd be an UB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's valid for the deltas to be negative, or do you mean something else?
| while (!state.volatile_buffer.empty() | ||
| && state.volatile_buffer.front().seqno | ||
| <= manifest.get_last_seqno()) { | ||
| state.volatile_buffer.pop_front(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a flush operation, right? Everything below or equal to manifest.get_last_seqno() should be applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a valid scenario in which the volatile_buffer is not empty after this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically a flush operation, right
That's right
Is there a valid scenario in which the volatile_buffer is not empty after this?
Yeah, I think something like this:
- write row1 at seqno1
- call flush on db
- manifest is uploaded with row1, its applied seqno is seqno1
- write row2 at seqno2
- manifest is replicated and applied to the STM
- volatile buffer removes row1 but keeps row2
| const auto offsets_delta = batch.header().last_offset_delta; | ||
| const auto res = co_await _raft->replicate(std::move(batch), opts); | ||
| if (res.has_error()) { | ||
| co_return std::unexpected(errc::raft_error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an opportunity for the concurrency problems to emerge. I think that here we should either use indefinite timeout or step down the leadership in case of error. Otherwise it will be triggering a race condition on timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah generally I agree we need to be cautious about handling failures. I left some thoughts here #29160 (comment) (TLDR either callers retry, or stepdown like you mention)
| auto update = serde::read<apply_write_batch_update>(value_parser); | ||
| auto result = update.apply(state_, o); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might fail if the seq.number is incorrect. How the caller (the code that replicated the batch) will know that this happened?
This adds a state machine that manages a database manifest and write batches that need to be applied to the manifest (dubbed the "volatile buffer", effectively a WAL). The manifest and the volatile buffer together are enough to construct a database that is caught up to the latest point in time. The expectation is that leaders will open the lsm::database from the manifest in the log, and then replay the volatile buffer on the resulting lsm::database.
A key requirement is that the state is recoverable from state in cloud. To that end, one of the updates introduced here allows for callers to reset the manifest if the existing manifest is empty, taking care to reset DB epoch and sequence number tracking to still be monotonically increasing after the reset. This will be used in a subsequent PR as a part of a lsm::metadata_persistence implementation.
The STM piece of this PR is fairly boilerplate -- the core logic introduced in this PR is in lsm_update.*.
A later PR will introduce an abstraction for leaders to wrap the STM to hide syncing and replaying writes.
Backports Required
Release Notes