Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Michał Maślanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Aug 21, 2024
1 parent 5e3d995 commit c8283f1
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 24 deletions.
8 changes: 5 additions & 3 deletions src/v/raft/recovery_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ recovery_stm::required_snapshot_type recovery_stm::get_required_snapshot_type(
*/
if (
follower_metadata.is_learner && _ptr->get_learner_start_offset()
&& follower_metadata.next_index < *_ptr->get_learner_start_offset()) {
&& follower_metadata.next_index < *_ptr->get_learner_start_offset()
&& _ptr->stm_manager()->supports_snapshot_at_offset()) {
// current snapshot moved beyond configured learner start offset, we can
// use current snapshot instead creating a new on demand one
if (*_ptr->get_learner_start_offset() <= _ptr->last_snapshot_index()) {
Expand Down Expand Up @@ -470,8 +471,9 @@ recovery_stm::take_on_demand_snapshot(model::offset last_included_offset) {
iobuf snapshot_data;

if (_ptr->stm_manager()) {
snapshot_data = co_await _ptr->stm_manager()->take_snapshot(
last_included_offset);
snapshot_data = (co_await _ptr->stm_manager()->take_snapshot(
last_included_offset))
.data;
}
auto cfg = _ptr->_configuration_manager.get(last_included_offset);
const auto term = _ptr->log()->get_term(last_included_offset);
Expand Down
33 changes: 28 additions & 5 deletions src/v/raft/state_machine_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
#include "raft/offset_monitor.h"

namespace raft {

using snapshot_at_offset_supported
= ss::bool_class<struct snapshot_at_offset_supported_tag>;
class consensus;
/**
* State machine interface. The class provides an interface that must be
* implemented to build state machine that can be registered in
* state_machine_manager.
*/

class state_machine_base {
public:
state_machine_base() = default;
Expand Down Expand Up @@ -56,13 +58,19 @@ class state_machine_base {
* returned in previous `state_machine_base::take_snapshot()` calls
*/
virtual ss::future<> apply_raft_snapshot(const iobuf&) = 0;

/**
* Returns a snapshot of an STM state with requested last included offset
* Returns a snapshot of an STM state with requested last included offset.
*/
virtual ss::future<iobuf>
take_snapshot(model::offset last_included_offset) = 0;
ss::future<iobuf> take_raft_snapshot(model::offset last_included_offset)

{
vassert(
supports_snapshot_at_offset()
|| last_included_offset == last_applied_offset(),
"if state machine do not support snapshots at offset it is only "
"allowed to call take snapshot with the last applied offset");
return take_snapshot(last_included_offset);
};
/**
* Last successfully applied offset
*/
Expand All @@ -82,7 +90,22 @@ class state_machine_base {
*/
virtual ss::future<> remove_local_state() = 0;

/**
* Returns true if a state machine supports taking a snapshot at any offset.
* It the state machine allows taking snapshot at any applied offset the
* partition will support fast reconfigurations.
*/
virtual snapshot_at_offset_supported supports_snapshot_at_offset() const {
return snapshot_at_offset_supported::yes;
}

protected:
/**
* Returns a snapshot of an STM state with requested last included offset.
* For implementer
*/
virtual ss::future<iobuf>
take_snapshot(model::offset last_included_offset) = 0;
/**
* Lifecycle is managed by state_machine_manager
*/
Expand Down
50 changes: 47 additions & 3 deletions src/v/raft/state_machine_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "raft/state_machine_manager.h"

#include "base/vassert.h"
#include "bytes/iostream.h"
#include "config/property.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -165,6 +166,9 @@ state_machine_manager::state_machine_manager(
, _log(ctx_log(_raft->group(), _raft->ntp()))
, _apply_sg(apply_sg) {
for (auto& n_stm : stms) {
_supports_snapshot_at_offset
= _supports_snapshot_at_offset
&& n_stm.stm->supports_snapshot_at_offset();
_machines.try_emplace(
n_stm.name,
ss::make_lw_shared<state_machine_entry>(
Expand Down Expand Up @@ -482,8 +486,13 @@ ss::future<> state_machine_manager::background_apply_fiber(
entry->name);
}

ss::future<iobuf>
ss::future<state_machine_manager::snapshot_result>
state_machine_manager::take_snapshot(model::offset last_included_offset) {
vassert(
static_cast<bool>(_supports_snapshot_at_offset),
"Snapshot at arbitrary offset can only be taken if manager supports fast "
"reconfigurations");

vlog(
_log.debug,
"taking snapshot with last included offset: {}",
Expand All @@ -506,14 +515,49 @@ state_machine_manager::take_snapshot(model::offset last_included_offset) {
managed_snapshot snapshot;
co_await ss::coroutine::parallel_for_each(
_machines, [last_included_offset, &snapshot](auto entry_pair) {
return entry_pair.second->stm->take_snapshot(last_included_offset)
return entry_pair.second->stm
->take_raft_snapshot(last_included_offset)
.then([&snapshot, key = entry_pair.first](auto snapshot_part) {
snapshot.snapshot_map.try_emplace(
key, std::move(snapshot_part));
});
});

co_return state_machine_manager::snapshot_result{
serde::to_iobuf(std::move(snapshot)), last_included_offset};
}

ss::future<state_machine_manager::snapshot_result>
state_machine_manager::take_snapshot() {
if (last_applied() < _raft->start_offset()) {
throw std::logic_error(fmt::format(
"Can not take snapshot of a state from before raft start offset. "
"Requested offset: {}, start offset: {}",
last_applied(),
_raft->start_offset()));
}
auto holder = _gate.hold();
// wait for all STMs to be on the same page
co_await wait(last_applied(), model::no_timeout, _as);

auto u = co_await _apply_mutex.get_units();
// wait once again for all state machines to apply the same baches
co_await wait(last_applied(), model::no_timeout, _as);
// snapshot can only be taken after all background applies finished
auto units = co_await acquire_background_apply_mutexes();
auto snapshot_offset = last_applied();
managed_snapshot snapshot;
co_await ss::coroutine::parallel_for_each(
_machines, [snapshot_offset, &snapshot](auto entry_pair) {
return entry_pair.second->stm->take_raft_snapshot(snapshot_offset)
.then([&snapshot, key = entry_pair.first](auto snapshot_part) {
snapshot.snapshot_map.try_emplace(
key, std::move(snapshot_part));
});
});

co_return serde::to_iobuf(std::move(snapshot));
co_return state_machine_manager::snapshot_result{
serde::to_iobuf(std::move(snapshot)), snapshot_offset};
}

ss::future<> state_machine_manager::wait(
Expand Down
27 changes: 26 additions & 1 deletion src/v/raft/state_machine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ concept StateMachineIterateFunc = requires(
*/
class state_machine_manager final {
public:
/**
* A result returned after taking a snapshot it contains a serde serialized
* snapshot data and last offset included into the snapshot.
*/
struct snapshot_result {
iobuf data;
model::offset last_included_offset;
};

// wait until at least offset is applied to all the state machines
ss::future<> wait(
model::offset,
Expand All @@ -75,13 +84,28 @@ class state_machine_manager final {
* state i.e last snapshot index is derived from last_applied_offset. In
* Redpanda we use different approach. Data eviction policy forces us to
* allow state machines to take snapshot at arbitrary offsets.
*
* This is only supported if all state machines support taking snapshots at
* arbitrary offset.
*/
ss::future<iobuf> take_snapshot(model::offset);
ss::future<snapshot_result> take_snapshot(model::offset);

/**
* If any of the state machines in the manager doesn't support fast
* reconfigurations this is the only API that the user is allowed to call,
* the take snapshot with offset other than _last_applied_offset will fail.
*/
ss::future<snapshot_result> take_snapshot();

ss::future<> start();
ss::future<> stop();

model::offset last_applied() const { return model::prev_offset(_next); }

snapshot_at_offset_supported supports_snapshot_at_offset() const {
return _supports_snapshot_at_offset;
}

/**
* Returns a pointer to specific type of state machine.
*
Expand Down Expand Up @@ -192,6 +216,7 @@ class state_machine_manager final {
ss::gate _gate;
ss::abort_source _as;
ss::scheduling_group _apply_sg;
snapshot_at_offset_supported _supports_snapshot_at_offset{true};
};

/**
Expand Down
21 changes: 11 additions & 10 deletions src/v/raft/tests/persisted_stm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,16 +432,17 @@ struct persisted_stm_test_fixture : state_machine_fixture {
});

// take snapshots on all of the nodes
co_await parallel_for_each_node(
[snapshot_offset](raft_node_instance& n) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](iobuf snapshot_data) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_data)));
});
});
co_await parallel_for_each_node([snapshot_offset](
raft_node_instance& n) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](
state_machine_manager::snapshot_result snapshot_result) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_result.data)));
});
});
}

ss::future<> take_local_snapshot_on_every_node() {
Expand Down
62 changes: 60 additions & 2 deletions src/v/raft/tests/stm_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ TEST_F_CORO(state_machine_fixture, test_recovery_from_snapshot) {
return n.raft()
->stm_manager()
->take_snapshot(snapshot_offset)
.then([raft = n.raft(), snapshot_offset](iobuf snapshot_data) {
.then([raft = n.raft(), snapshot_offset](
state_machine_manager::snapshot_result snapshot_result) {
return raft->write_snapshot(raft::write_snapshot_cfg(
snapshot_offset, std::move(snapshot_data)));
snapshot_offset, std::move(snapshot_result.data)));
});
});

Expand Down Expand Up @@ -555,3 +556,60 @@ TEST_F_CORO(state_machine_fixture, test_all_machines_throw) {
ASSERT_EQ_CORO(stm->state, expected);
}
}

class non_fast_movable_kv : public simple_kv {
public:
static constexpr std::string_view name = "other_persited_kv_stm";
explicit non_fast_movable_kv(raft_node_instance& rn)
: simple_kv(rn) {}

ss::future<iobuf> take_snapshot(model::offset last_included_offset) final {
vassert(
last_included_offset == last_applied_offset(),
"state machine do not support snapshot at arbitrary offset");
co_return serde::to_iobuf(state);
}

snapshot_at_offset_supported supports_snapshot_at_offset() const final {
return snapshot_at_offset_supported::no;
}
};

TEST_F_CORO(state_machine_fixture, test_opt_out_from_snapshot_at_offset) {
create_nodes();
std::vector<ss::shared_ptr<simple_kv>> stms;
for (auto& [id, node] : nodes()) {
raft::state_machine_manager_builder builder;
builder.create_stm<simple_kv>(*node);
builder.create_stm<non_fast_movable_kv>(*node);

co_await node->init_and_start(all_vnodes(), std::move(builder));
}

for (auto& [_, node] : nodes()) {
ASSERT_FALSE_CORO(
node->raft()->stm_manager()->supports_snapshot_at_offset());
}

auto expected = co_await build_random_state(1000);

// take snapshots on all of the nodes
absl::flat_hash_map<model::node_id, model::offset> offsets;
for (auto& [id, node] : nodes()) {
auto o = co_await node->raft()->stm_manager()->take_snapshot().then(
[raft = node->raft()](
state_machine_manager::snapshot_result snapshot_data) {
return raft
->write_snapshot(raft::write_snapshot_cfg(
snapshot_data.last_included_offset,
std::move(snapshot_data.data)))
.then([o = snapshot_data.last_included_offset] { return o; });
});
offsets[id] = o;
}

for (const auto& [id, n] : nodes()) {
ASSERT_EQ_CORO(
n->raft()->start_offset(), model::next_offset(offsets[id]));
}
}

0 comments on commit c8283f1

Please sign in to comment.