Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Giving state machine implementers a way to opt out from being forced to support snapshotting at arbitrary offset. #22961

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cluster/distributed_kv_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class distributed_kv_stm final : public raft::persisted_stm<> {
static constexpr std::string_view name = Name;
explicit distributed_kv_stm(
size_t max_partitions, ss::logger& logger, raft::consensus* raft)
: persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
: raft::persisted_stm<>("distributed_kv_stm.snapshot", logger, raft)
, _default_max_partitions(max_partitions)
, _is_routing_partition(_raft->ntp().tp.partition == routing_partition) {}

Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ id_allocator_stm::id_allocator_stm(ss::logger& logger, raft::consensus* c)

id_allocator_stm::id_allocator_stm(
ss::logger& logger, raft::consensus* c, config::configuration& cfg)
: persisted_stm(id_allocator_snapshot, logger, c)
: raft::persisted_stm<>(id_allocator_snapshot, logger, c)
, _batch_size(cfg.id_allocator_batch_size.value())
, _log_capacity(cfg.id_allocator_log_capacity.value()) {}

ss::future<bool>
id_allocator_stm::sync(model::timeout_clock::duration timeout) {
auto term = _insync_term;
auto is_synced = co_await persisted_stm::sync(timeout);
auto is_synced = co_await raft::persisted_stm<>::sync(timeout);
if (is_synced) {
if (term != _insync_term) {
_curr_id = _state;
Expand Down
12 changes: 6 additions & 6 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ struct snapshot_data

log_eviction_stm::log_eviction_stm(
raft::consensus* raft, ss::logger& logger, storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) {}
: base_t("log_eviction_stm.snapshot", logger, raft, kvstore) {}

ss::future<> log_eviction_stm::start() {
ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); });
ssx::spawn_with_gate(
_gate, [this] { return handle_log_eviction_events(); });
return persisted_stm::start();
return base_t::start();
}

ss::future<> log_eviction_stm::stop() {
_as.request_abort();
_has_pending_truncation.broken();
co_await persisted_stm::stop();
co_await base_t::stop();
}

ss::future<> log_eviction_stm::handle_log_eviction_events() {
Expand Down Expand Up @@ -200,7 +200,7 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
_log.debug,
"Requesting raft snapshot with final offset: {}",
truncation_point);
auto snapshot_data = co_await _raft->stm_manager()->take_snapshot(
auto snapshot_result = co_await _raft->stm_manager()->take_snapshot(
truncation_point);
// we need to check snapshot index again as it may already progressed after
// snapshot is taken by stm_manager
Expand All @@ -214,8 +214,8 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
truncation_point);
co_return;
}
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data)));
co_await _raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_result.last_included_offset, std::move(snapshot_result.data)));
}

kafka::offset log_eviction_stm::kafka_start_offset_override() {
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class log_eviction_stm
virtual ss::future<model::offset> storage_eviction_event();

private:
using base_t = raft::persisted_stm<raft::kvstore_backed_stm_snapshot>;
void increment_start_offset(model::offset);
bool should_process_evict(model::offset);

Expand Down
9 changes: 5 additions & 4 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ ss::future<> rm_stm::stop() {
co_await raft::persisted_stm<>::stop();
}

ss::future<> rm_stm::start() { return persisted_stm::start(); }
ss::future<> rm_stm::start() { return raft::persisted_stm<>::start(); }

std::optional<int32_t>
rm_stm::get_seq_number(model::producer_identity pid) const {
Expand Down Expand Up @@ -1255,7 +1255,7 @@ rm_stm::do_aborted_transactions(model::offset from, model::offset to) {

ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto current_insync_term = _insync_term;
auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (ready) {
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
Expand Down Expand Up @@ -1935,7 +1935,8 @@ uint64_t rm_stm::get_local_snapshot_size() const {
clusterlog.trace,
"rm_stm: aborted snapshots size {}",
abort_snapshots_size);
return persisted_stm::get_local_snapshot_size() + abort_snapshots_size;
return raft::persisted_stm<>::get_local_snapshot_size()
+ abort_snapshots_size;
}

ss::future<> rm_stm::save_abort_snapshot(abort_snapshot snapshot) {
Expand Down Expand Up @@ -2016,7 +2017,7 @@ ss::future<> rm_stm::do_remove_persistent_state() {
co_await _abort_snapshot_mgr.remove_snapshot(filename);
}
co_await _abort_snapshot_mgr.remove_partial_snapshots();
co_return co_await persisted_stm::remove_persistent_state();
co_return co_await raft::persisted_stm<>::remove_persistent_state();
}

ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
Expand Down
9 changes: 4 additions & 5 deletions src/v/cluster/tests/partition_properties_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,10 @@ TEST_F_CORO(
auto base_offset = co_await node->random_batch_base_offset(
node->raft()->committed_offset(), model::offset(100));
auto snapshot_offset = model::prev_offset(base_offset);

co_await node->raft()->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset,
co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset)));
auto result = co_await node->raft()->stm_manager()->take_snapshot(
snapshot_offset);
co_await node->raft()->write_snapshot(
raft::write_snapshot_cfg(snapshot_offset, std::move(result.data)));
}

// test follower recovery with snapshot
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ tm_stm::tm_stm(
, _feature_table(feature_table)
, _ctx_log(logger, ssx::sformat("[{}]", _raft->ntp())) {}

ss::future<> tm_stm::start() { co_await persisted_stm::start(); }
ss::future<> tm_stm::start() { co_await raft::persisted_stm<>::start(); }

uint8_t tm_stm::active_snapshot_version() { return tm_snapshot::version; }

Expand Down Expand Up @@ -179,7 +179,7 @@ tm_stm::do_sync(model::timeout_clock::duration timeout) {
co_return tm_stm::op_status::not_leader;
}

auto ready = co_await persisted_stm::sync(timeout);
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (!ready) {
co_return tm_stm::op_status::unknown;
}
Expand Down
114 changes: 67 additions & 47 deletions src/v/raft/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ stm_snapshot_key(const ss::sstring& snapshot_name, const model::ntp& ntp) {

} // namespace

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
template<typename... Args>
persisted_stm<T>::persisted_stm(
persisted_stm_base<BaseT, T>::persisted_stm_base(
ss::sstring snapshot_mgr_name,
ss::logger& logger,
raft::consensus* c,
Expand All @@ -66,20 +66,20 @@ persisted_stm<T>::persisted_stm(
, _snapshot_backend(snapshot_mgr_name, _log, c, std::forward<Args>(args)...) {
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<std::optional<stm_snapshot>>
persisted_stm<T>::load_local_snapshot() {
persisted_stm_base<BaseT, T>::load_local_snapshot() {
return _snapshot_backend.load_snapshot();
}
template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::stop() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::stop() {
_apply_lock.broken();
co_await raft::state_machine_base::stop();
co_await _gate.close();
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::remove_persistent_state() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::remove_persistent_state() {
return _snapshot_backend.remove_persistent_state();
}

Expand Down Expand Up @@ -283,13 +283,13 @@ size_t kvstore_backed_stm_snapshot::get_snapshot_size() const {
return 0;
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::wait_for_snapshot_hydrated() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::wait_for_snapshot_hydrated() {
return _on_snapshot_hydrated.wait([this] { return _snapshot_hydrated; });
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::do_write_local_snapshot() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::do_write_local_snapshot() {
auto u = co_await _apply_lock.get_units();
auto snapshot = co_await take_local_snapshot(std::move(u));
auto offset = snapshot.header.offset;
Expand All @@ -298,27 +298,27 @@ ss::future<> persisted_stm<T>::do_write_local_snapshot() {
_last_snapshot_offset = std::max(_last_snapshot_offset, offset);
}

template<supported_stm_snapshot T>
void persisted_stm<T>::write_local_snapshot_in_background() {
template<typename BaseT, supported_stm_snapshot T>
void persisted_stm_base<BaseT, T>::write_local_snapshot_in_background() {
ssx::spawn_with_gate(_gate, [this] { return write_local_snapshot(); });
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::write_local_snapshot() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::write_local_snapshot() {
return _op_lock.with([this]() {
return wait_for_snapshot_hydrated().then(
[this] { return do_write_local_snapshot(); });
});
}

template<supported_stm_snapshot T>
uint64_t persisted_stm<T>::get_local_snapshot_size() const {
template<typename BaseT, supported_stm_snapshot T>
uint64_t persisted_stm_base<BaseT, T>::get_local_snapshot_size() const {
return _snapshot_backend.get_snapshot_size();
}

template<supported_stm_snapshot T>
ss::future<>
persisted_stm<T>::ensure_local_snapshot_exists(model::offset target_offset) {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::ensure_local_snapshot_exists(
model::offset target_offset) {
vlog(
_log.debug,
"ensure snapshot_exists with target offset: {}",
Expand All @@ -328,35 +328,35 @@ persisted_stm<T>::ensure_local_snapshot_exists(model::offset target_offset) {
if (target_offset <= _last_snapshot_offset) {
return ss::now();
}
return wait(target_offset, model::no_timeout)
return BaseT::wait(target_offset, model::no_timeout)
.then([this, target_offset]() {
vassert(
target_offset < next(),
target_offset < BaseT::next(),
"[{} ({})] after we waited for target_offset ({}) "
"next ({}) must be greater",
_raft->ntp(),
name(),
target_offset,
next());
BaseT::next());
return do_write_local_snapshot();
});
});
});
}

template<supported_stm_snapshot T>
model::offset persisted_stm<T>::max_collectible_offset() {
template<typename BaseT, supported_stm_snapshot T>
model::offset persisted_stm_base<BaseT, T>::max_collectible_offset() {
return model::offset::max();
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<fragmented_vector<model::tx_range>>
persisted_stm<T>::aborted_tx_ranges(model::offset, model::offset) {
persisted_stm_base<BaseT, T>::aborted_tx_ranges(model::offset, model::offset) {
return ss::make_ready_future<fragmented_vector<model::tx_range>>();
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::wait_offset_committed(
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::wait_offset_committed(
model::timeout_clock::duration timeout,
model::offset offset,
model::term_id term) {
Expand All @@ -367,8 +367,8 @@ ss::future<> persisted_stm<T>::wait_offset_committed(
return _raft->commit_index_updated().wait(timeout, stop_cond);
}

template<supported_stm_snapshot T>
ss::future<bool> persisted_stm<T>::do_sync(
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool> persisted_stm_base<BaseT, T>::do_sync(
model::timeout_clock::duration timeout,
model::offset offset,
model::term_id term) {
Expand Down Expand Up @@ -403,7 +403,7 @@ ss::future<bool> persisted_stm<T>::do_sync(

if (_raft->term() == term) {
try {
co_await wait(offset, model::timeout_clock::now() + timeout);
co_await BaseT::wait(offset, model::timeout_clock::now() + timeout);
} catch (const ss::broken_condition_variable&) {
co_return false;
} catch (const ss::gate_closed_exception&) {
Expand Down Expand Up @@ -440,9 +440,9 @@ ss::future<bool> persisted_stm<T>::do_sync(
co_return false;
}

template<supported_stm_snapshot T>
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool>
persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
persisted_stm_base<BaseT, T>::sync(model::timeout_clock::duration timeout) {
auto term = _raft->term();
if (!_raft->is_leader()) {
return ss::make_ready_future<bool>(false);
Expand Down Expand Up @@ -492,12 +492,12 @@ persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
});
}

template<supported_stm_snapshot T>
ss::future<bool> persisted_stm<T>::wait_no_throw(
template<typename BaseT, supported_stm_snapshot T>
ss::future<bool> persisted_stm_base<BaseT, T>::wait_no_throw(
model::offset offset,
model::timeout_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as) noexcept {
return wait(offset, deadline, as)
return BaseT::wait(offset, deadline, as)
.then([] { return true; })
.handle_exception_type([](const ss::abort_requested_exception&) {
// Shutting down
Expand All @@ -519,8 +519,8 @@ ss::future<bool> persisted_stm<T>::wait_no_throw(
});
}

template<supported_stm_snapshot T>
ss::future<> persisted_stm<T>::start() {
template<typename BaseT, supported_stm_snapshot T>
ss::future<> persisted_stm_base<BaseT, T>::start() {
if (_raft->dirty_offset() == model::offset{}) {
co_await _snapshot_backend.perform_initial_cleanup();
}
Expand Down Expand Up @@ -549,7 +549,7 @@ ss::future<> persisted_stm<T>::start() {
next_offset);
co_await apply_local_snapshot(
snapshot.header, std::move(snapshot.data));
set_next(next_offset);
BaseT::set_next(next_offset);
_last_snapshot_offset = snapshot.header.offset;
} else {
// This can happen on an out-of-date replica that re-joins the group
Expand All @@ -570,15 +570,35 @@ ss::future<> persisted_stm<T>::start() {
_on_snapshot_hydrated.broadcast();
}

template class persisted_stm<file_backed_stm_snapshot>;
template class persisted_stm<kvstore_backed_stm_snapshot>;
template class persisted_stm_base<state_machine_base, file_backed_stm_snapshot>;
template class persisted_stm_base<
state_machine_base,
kvstore_backed_stm_snapshot>;

template class persisted_stm_base<
no_at_offset_snapshot_stm_base,
file_backed_stm_snapshot>;
template class persisted_stm_base<
no_at_offset_snapshot_stm_base,
kvstore_backed_stm_snapshot>;

template persisted_stm_base<state_machine_base, file_backed_stm_snapshot>::
persisted_stm_base(ss::sstring, seastar::logger&, raft::consensus*);

template persisted_stm<file_backed_stm_snapshot>::persisted_stm(
ss::sstring, seastar::logger&, raft::consensus*);
template persisted_stm_base<state_machine_base, kvstore_backed_stm_snapshot>::
persisted_stm_base(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);

template persisted_stm<kvstore_backed_stm_snapshot>::persisted_stm(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);
template persisted_stm_base<
no_at_offset_snapshot_stm_base,
file_backed_stm_snapshot>::
persisted_stm_base(ss::sstring, seastar::logger&, raft::consensus*);

template persisted_stm_base<
no_at_offset_snapshot_stm_base,
kvstore_backed_stm_snapshot>::
persisted_stm_base(
ss::sstring, seastar::logger&, raft::consensus*, storage::kvstore&);
ss::sstring kvstore_backed_stm_snapshot::snapshot_key(
const ss::sstring& snapshot_name, const model::ntp& ntp) {
return stm_snapshot_key(snapshot_name, ntp);
Expand Down
Loading