Skip to content

Commit

Permalink
handle_committed_entries after saving hard state
Browse files Browse the repository at this point in the history
Signed-off-by: Egor Ivkov <[email protected]>
  • Loading branch information
e-ivkov committed Oct 25, 2022
1 parent 36d3293 commit 3673722
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
8 changes: 8 additions & 0 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,15 @@ fn on_ready(
}
}

let mut _last_apply_index = 0;
let mut handle_committed_entries =
|rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
// It is important to save the index only after setting hard state
// So that the precondition applied <= committed cannot be broken on restart.
_last_apply_index = entry.index;
if entry.data.is_empty() {
// From new elected leaders.
continue;
Expand Down Expand Up @@ -325,6 +331,8 @@ fn on_ready(
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}
// Apply all committed entries.
handle_committed_entries(raft_group, ready.take_committed_entries());

if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
Expand Down
4 changes: 3 additions & 1 deletion examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeC
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
// It is important to save the index only after setting hard state
// So that the precondition applied <= committed cannot be broken on restart.
_last_apply_index = entry.index;

if entry.data.is_empty() {
Expand All @@ -147,7 +149,6 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeC
// TODO: handle EntryConfChange
}
};
handle_committed_entries(ready.take_committed_entries());

if !ready.entries().is_empty() {
// Append entries to the Raft log.
Expand All @@ -163,6 +164,7 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeC
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
}
handle_committed_entries(ready.take_committed_entries());

// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
Expand Down

0 comments on commit 3673722

Please sign in to comment.