Skip to content

Commit

Permalink
allow raft apply committed logs before they are persisted (#537)
Browse files Browse the repository at this point in the history
Signed-off-by: glorv <[email protected]>
  • Loading branch information
glorv authored Mar 28, 2024
1 parent 4126862 commit 3cfa667
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 30 deletions.
4 changes: 2 additions & 2 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5286,7 +5286,7 @@ fn test_group_commit_consistent() {
/// of the election with both priority and log.
#[test]
fn test_election_with_priority_log() {
let tests = vec![
let tests = [
// log is up to date or not 1..3, priority 1..3, id, state
(true, false, false, 3, 1, 1, 1, StateRole::Leader),
(true, false, false, 2, 2, 2, 1, StateRole::Leader),
Expand All @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() {
(false, false, true, 1, 1, 3, 1, StateRole::Leader),
];

for &(l1, l2, l3, p1, p2, p3, id, state) in tests.iter() {
for (l1, l2, l3, p1, p2, p3, id, state) in tests {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
Expand Down
5 changes: 5 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ pub struct Config {

/// Max size for committed entries in a `Ready`.
pub max_committed_size_per_ready: u64,

/// Maximum raft log number that can be applied after commit but before persist.
/// The default value is 0, which means apply after both commit and persist.
pub max_apply_unpersisted_log_limit: u64,
}

impl Default for Config {
Expand All @@ -120,6 +124,7 @@ impl Default for Config {
priority: 0,
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
max_apply_unpersisted_log_limit: 0,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,8 @@ before taking old, removed peers offline.
#![deny(clippy::all)]
#![deny(missing_docs)]
#![recursion_limit = "128"]
// TODO: remove this when we update the mininum rust compatible version.
#![allow(unused_imports)]
// This is necessary to support prost and rust-protobuf at the same time.
#![allow(clippy::useless_conversion)]
// This lint recommends some bad choices sometimes.
Expand Down
35 changes: 31 additions & 4 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl<T: Storage> Raft<T> {
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
raft_log: RaftLog::new(store, logger.clone(), c),
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
Expand Down Expand Up @@ -378,7 +378,9 @@ impl<T: Storage> Raft<T> {
r.load_state(&raft_state.hard_state);
}
if c.applied > 0 {
r.commit_apply(c.applied);
// at initialize, it is possible that applied_index > committed_index,
// so we should skip the check at `commit_apply`.
r.commit_apply_internal(c.applied, true);
}
r.become_follower(r.term, INVALID_ID);

Expand Down Expand Up @@ -596,6 +598,11 @@ impl<T: Storage> Raft<T> {
pub fn set_check_quorum(&mut self, check_quorum: bool) {
self.check_quorum = check_quorum;
}

/// Set the maximum limit that applied index can be ahead of persisted index.
pub fn set_max_apply_unpersisted_log_limit(&mut self, limit: u64) {
self.raft_log.max_apply_unpersisted_log_limit = limit;
}
}

impl<T: Storage> RaftCore<T> {
Expand Down Expand Up @@ -946,10 +953,30 @@ impl<T: Storage> Raft<T> {
/// # Hooks
///
/// * Post: Checks to see if it's time to finalize a Joint Consensus state.
#[inline]
pub fn commit_apply(&mut self, applied: u64) {
self.commit_apply_internal(applied, false)
}

/// Commit that the Raft peer has applied up to the given index.
///
/// Registers the new applied index to the Raft log.
/// if `skip_check` is true, will skip the applied_index check, this is only
/// used at initialization.
///
/// # Hooks
///
/// * Post: Checks to see if it's time to finalize a Joint Consensus state.
fn commit_apply_internal(&mut self, applied: u64, skip_check: bool) {
let old_applied = self.raft_log.applied;
#[allow(deprecated)]
self.raft_log.applied_to(applied);
if !skip_check {
#[allow(deprecated)]
self.raft_log.applied_to(applied);
} else {
// skip applied_index check at initialization.
assert!(applied > 0);
self.raft_log.applied_to_unchecked(applied);
}

// TODO: it may never auto_leave if leader steps down before enter joint is applied.
if self.prs.conf().auto_leave
Expand Down
Loading

0 comments on commit 3cfa667

Please sign in to comment.