From 2fbeee5b89b22c392da39435b79be9896acedd1d Mon Sep 17 00:00:00 2001 From: Yuyang Wei <844740374@qq.com> Date: Fri, 6 Dec 2024 19:47:03 +0800 Subject: [PATCH] raft: next index shall be larger than match index (#557) * Fixed a case in progress.go maybe_decr_to that could cause the next value to be less than or equal to the match value in the probe state Close #555. Signed-off-by: wego1236 <844740374@qq.com> --- harness/tests/integration_cases/test_raft.rs | 53 ++++++++++++++++++++ src/tracker/progress.rs | 4 +- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index a62266d2..644747c5 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -5851,3 +5851,56 @@ fn test_switching_check_quorum() { } assert_eq!(sm.state, StateRole::Leader); } + +fn expect_one_message(r: &mut Interface) -> Message { + let msgs = r.read_messages(); + assert_eq!(msgs.len(), 1, "expect one message"); + msgs[0].clone() +} + +#[test] +fn test_log_replication_with_reordered_message() { + let l = default_logger(); + let mut r1 = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); + r1.become_candidate(); + r1.become_leader(); + r1.read_messages(); + r1.mut_prs().get_mut(2).unwrap().become_replicate(); + + let mut r2 = new_test_raft(2, vec![1, 2], 10, 1, new_storage(), &l); + + // r1 sends 2 MsgApp messages to r2. + let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r1.send_append(2); + let req1 = expect_one_message(&mut r1); + let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r1.send_append(2); + let req2 = expect_one_message(&mut r1); + + // r2 receives the second MsgApp first due to reordering. + let _ = r2.step(req2); + let resp2 = expect_one_message(&mut r2); + // r2 rejects req2 + assert!(resp2.reject); + assert_eq!(resp2.reject_hint, 0); + assert_eq!(resp2.index, 2); + + // r2 handles the first MsgApp and responses to r1. + // And r1 updates match index accordingly. + let _ = r2.step(req1); + let m = expect_one_message(&mut r2); + assert!(!m.reject); + assert_eq!(m.index, 2); + let _ = r1.step(m); + assert_eq!(r1.prs().get(2).unwrap().matched, 2); + + // r1 observes a transient network issue to r2, hence transits to probe state. + let _ = r1.step(new_message(2, 1, MessageType::MsgUnreachable, 0)); + assert_eq!(r1.prs().get(2).unwrap().state, ProgressState::Probe); + + // now r1 receives the delayed resp2. + let _ = r1.step(resp2); + let m = expect_one_message(&mut r1); + // r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index. + assert_eq!(r1.prs().get(2).unwrap().matched, m.index) +} diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 20fe9225..dc19bb81 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -192,8 +192,8 @@ impl Progress { // Do not decrease next index if it's requesting snapshot. if request_snapshot == INVALID_INDEX { self.next_idx = cmp::min(rejected, match_hint + 1); - if self.next_idx < 1 { - self.next_idx = 1; + if self.next_idx < self.matched + 1 { + self.next_idx = self.matched + 1; } } else if self.pending_request_snapshot == INVALID_INDEX { // Allow requesting snapshot even if it's not Replicate.