From 63aec467a669dc46da8f6921ed893cf76eda65e4 Mon Sep 17 00:00:00 2001 From: Dat Tien Nguyen Date: Mon, 21 Oct 2024 13:47:05 +0700 Subject: [PATCH] feat: add disable_proposal_forwarding config params (#552) Signed-off-by: Dat Tien Nguyen --- .../tests/integration_cases/test_raw_node.rs | 71 +++++++++++++++++++ src/config.rs | 4 ++ src/raft.rs | 11 +++ 3 files changed, 86 insertions(+) diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index 5cea3f55..16b40631 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -1876,6 +1876,77 @@ fn test_committed_entries_pagination_after_restart() { } } +#[test] +fn test_disable_proposal_forwarding() { + let l = default_logger(); + + let n1 = new_test_raft_with_config( + &Config { + id: 1, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: false, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let n2 = new_test_raft_with_config( + &Config { + id: 2, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: false, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let n3 = new_test_raft_with_config( + &Config { + id: 3, + heartbeat_tick: 1, + election_tick: 10, + disable_proposal_forwarding: true, + ..Default::default() + }, + MemStorage::new_with_conf_state((vec![1, 2, 3], vec![])), + &l, + ); + + let mut network = Network::new(vec![Some(n1), Some(n2), Some(n3)], &l); + + // node 1 starts campaign to become leader. + network.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + // send proposal to n2(follower) where DisableProposalForwarding is false + assert_eq!( + network + .peers + .get_mut(&2) + .unwrap() + .step(new_message(2, 2, MessageType::MsgPropose, 1)), + Ok(()) + ); + + // verify n2(follower) does forward the proposal when DisableProposalForwarding is false + assert_eq!(network.peers.get(&2).unwrap().msgs.len(), 1); + + // send proposal to n3(follower) where DisableProposalForwarding is true + assert_eq!( + network + .peers + .get_mut(&3) + .unwrap() + .step(new_message(3, 3, MessageType::MsgPropose, 1)), + Err(Error::ProposalDropped) + ); + + assert!(network.peers.get(&3).unwrap().msgs.is_empty()); +} + #[derive(Default)] struct IgnoreSizeHintMemStorage { inner: MemStorage, diff --git a/src/config.rs b/src/config.rs index 1bc6d323..3668db88 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,6 +102,9 @@ pub struct Config { /// Maximum raft log number that can be applied after commit but before persist. /// The default value is 0, which means apply after both commit and persist. pub max_apply_unpersisted_log_limit: u64, + + /// If enable, followers will not forward proposal to leader. + pub disable_proposal_forwarding: bool, } impl Default for Config { @@ -125,6 +128,7 @@ impl Default for Config { max_uncommitted_size: NO_LIMIT, max_committed_size_per_ready: NO_LIMIT, max_apply_unpersisted_log_limit: 0, + disable_proposal_forwarding: false, } } } diff --git a/src/raft.rs b/src/raft.rs index 91d90209..60b92a6c 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -233,6 +233,7 @@ pub struct RaftCore { skip_bcast_commit: bool, batch_append: bool, + disable_proposal_forwarding: bool, heartbeat_timeout: usize, election_timeout: usize, @@ -363,6 +364,7 @@ impl Raft { last_log_tail_index: 0, }, max_committed_size_per_ready: c.max_committed_size_per_ready, + disable_proposal_forwarding: c.disable_proposal_forwarding, }, }; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; @@ -2337,6 +2339,15 @@ impl Raft { term = self.term; ); return Err(Error::ProposalDropped); + } else if self.disable_proposal_forwarding { + info!( + self.logger, + "{from} not forwarding to leader {to} at term {term}; dropping proposal", + from = self.id, + to = self.leader_id, + term = self.term; + ); + return Err(Error::ProposalDropped); } m.to = self.leader_id; self.r.send(m, &mut self.msgs);