Skip to content

Commit

Permalink
r/stm: added support for state machines without snapshot at offset cap
Browse files Browse the repository at this point in the history
To support fast partition movement in Redpanda the
`raft::state_machine_base` interface requires state machine implementer
to provide support for taking snapshot at arbitrary, already applied
offset. This requirement is easy to achieve for all existing state
machines as they do not require raft snapshots at all.

In future we may leverage the Raft protocol snapshot as they provide a
nice way to maintain the bounded log size without compromising
consistency.

This PR introduces changes to `state_machine_manager` and
`state_machine_base` interfaces allowing state machine implementer to
opt out from the requirement to provide snapshot at offset capability.
Without this feature the partition that the STM is build at will not be
fast movable but the `take_snapshot` implementation will be very simple
as it will require simply serializing the current in memory state of the
state machine.

Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Oct 2, 2024
1 parent aeb8e29 commit 122e87f
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
static constexpr std::string_view name = Name;
explicit distributed_kv_stm(
size_t max_partitions, ss::logger& logger, raft::consensus* raft)
: persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
: raft::persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
, _default_max_partitions(max_partitions)
, _is_routing_partition(_raft->ntp().tp.partition == routing_partition) {}

Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ id_allocator_stm::id_allocator_stm(ss::logger& logger, raft::consensus* c)

id_allocator_stm::id_allocator_stm(
ss::logger& logger, raft::consensus* c, config::configuration& cfg)
: persisted_stm(id_allocator_snapshot, logger, c)
: raft::persisted_stm<>(id_allocator_snapshot, logger, c)
, _batch_size(cfg.id_allocator_batch_size.value())
, _log_capacity(cfg.id_allocator_log_capacity.value()) {}

ss::future<bool>
id_allocator_stm::sync(model::timeout_clock::duration timeout) {
auto term = _insync_term;
auto is_synced = co_await persisted_stm::sync(timeout);
auto is_synced = co_await raft::persisted_stm<>::sync(timeout);
if (is_synced) {
if (term != _insync_term) {
_curr_id = _state;
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ struct snapshot_data

log_eviction_stm::log_eviction_stm(
raft::consensus* raft, ss::logger& logger, storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) {}
: base_t("log_eviction_stm.snapshot", logger, raft, kvstore) {}

ss::future<> log_eviction_stm::start() {
ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); });
ssx::spawn_with_gate(
_gate, [this] { return handle_log_eviction_events(); });
return persisted_stm::start();
return base_t::start();
}

ss::future<> log_eviction_stm::stop() {
_as.request_abort();
_has_pending_truncation.broken();
co_await persisted_stm::stop();
co_await base_t::stop();
}

ss::future<> log_eviction_stm::handle_log_eviction_events() {
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class log_eviction_stm
virtual ss::future<model::offset> storage_eviction_event();

private:
using base_t = raft::persisted_stm<raft::kvstore_backed_stm_snapshot>;
void increment_start_offset(model::offset);
bool should_process_evict(model::offset);

Expand Down
9 changes: 5 additions & 4 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ ss::future<> rm_stm::stop() {
co_await raft::persisted_stm<>::stop();
}

ss::future<> rm_stm::start() { return persisted_stm::start(); }
ss::future<> rm_stm::start() { return raft::persisted_stm<>::start(); }

std::optional<int32_t>
rm_stm::get_seq_number(model::producer_identity pid) const {
Expand Down Expand Up @@ -1255,7 +1255,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto current_insync_term = _insync_term;
auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (ready) {
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
Expand Down Expand Up @@ -1935,7 +1935,8 @@ uint64_t rm_stm::get_local_snapshot_size() const {
clusterlog.trace,
"rm_stm: aborted snapshots size {}",
abort_snapshots_size);
return persisted_stm::get_local_snapshot_size() + abort_snapshots_size;
return raft::persisted_stm<>::get_local_snapshot_size()
+ abort_snapshots_size;
}

ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
Expand Down Expand Up @@ -2016,7 +2017,7 @@ ss::future<> rm_stm::do_remove_persistent_state() {
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}
co_await _abort_snapshot_mgr.remove_partial_snapshots();
co_return co_await persisted_stm::remove_persistent_state();
co_return co_await raft::persisted_stm<>::remove_persistent_state();
}

ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/tests/partition_properties_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,10 @@ TEST_F_CORO(
auto base_offset = co_await node->random_batch_base_offset(
node->raft()->committed_offset(), model::offset(100));
auto snapshot_offset = model::prev_offset(base_offset);

co_await node->raft()->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset,
co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset)));
auto result = co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset);
co_await node->raft()->write_snapshot(
raft::write_snapshot_cfg(snapshot_offset, std::move(result.data)));
}

// test follower recovery with snapshot
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ tm_stm::tm_stm(
, _feature_table(feature_table)
, _ctx_log(logger, ssx::sformat("[{}]", _raft->ntp())) {}

ss::future<> tm_stm::start() { co_await persisted_stm::start(); }
ss::future<> tm_stm::start() { co_await raft::persisted_stm<>::start(); }

uint8_t tm_stm::active_snapshot_version() { return tm_snapshot::version; }

Expand Down Expand Up @@ -179,7 +179,7 @@ tm_stm::do_sync(model::timeout_clock::duration timeout) {
co_return tm_stm::op_status::not_leader;
}

auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (!ready) {
co_return tm_stm::op_status::unknown;
}
Expand Down

0 comments on commit 122e87f

Please sign in to comment.