From d34f6936e737563cc930bb8f03970a18f0cba7c9 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 5 Nov 2019 17:08:39 +0800 Subject: [PATCH] proto: introduce joint-consensus from etcd/raft (#310) Signed-off-by: qupeng --- examples/five_mem_node/main.rs | 4 +- harness/tests/integration_cases/mod.rs | 1 - .../test_membership_changes.rs | 1679 ----------------- harness/tests/integration_cases/test_raft.rs | 4 +- .../tests/integration_cases/test_raw_node.rs | 2 +- harness/tests/test_util/mod.rs | 4 +- proto/proto/eraftpb.proto | 94 +- proto/src/lib.rs | 27 +- src/errors.rs | 13 - src/lib.rs | 34 +- src/progress/progress_set.rs | 319 +--- src/raft.rs | 300 +-- src/raw_node.rs | 16 +- src/storage.rs | 55 +- 14 files changed, 113 insertions(+), 2439 deletions(-) delete mode 100644 harness/tests/integration_cases/test_membership_changes.rs diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index 6270520b7..a453d59c8 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -300,11 +300,9 @@ fn on_ready( ConfChangeType::AddNode => raft_group.raft.add_node(node_id).unwrap(), ConfChangeType::RemoveNode => raft_group.raft.remove_node(node_id).unwrap(), ConfChangeType::AddLearnerNode => raft_group.raft.add_learner(node_id).unwrap(), - ConfChangeType::BeginMembershipChange - | ConfChangeType::FinalizeMembershipChange => unimplemented!(), } let cs = raft_group.raft.prs().configuration().to_conf_state(); - store.wl().set_conf_state(cs, None); + store.wl().set_conf_state(cs); } else { // For normal proposals, extract the key-value pair and then // insert them into the kv engine. diff --git a/harness/tests/integration_cases/mod.rs b/harness/tests/integration_cases/mod.rs index da64313c5..1bed08b3f 100644 --- a/harness/tests/integration_cases/mod.rs +++ b/harness/tests/integration_cases/mod.rs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod test_membership_changes; mod test_raft; mod test_raft_flow_control; mod test_raft_paper; diff --git a/harness/tests/integration_cases/test_membership_changes.rs b/harness/tests/integration_cases/test_membership_changes.rs deleted file mode 100644 index ad4de2522..000000000 --- a/harness/tests/integration_cases/test_membership_changes.rs +++ /dev/null @@ -1,1679 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. -// - -use std::ops::{Deref, DerefMut}; - -use harness::Network; -use protobuf::Message as PbMessage; -use raft::{ - default_logger, - eraftpb::{ - ConfChange, ConfChangeType, ConfState, Entry, EntryType, Message, MessageType, Snapshot, - }, - storage::MemStorage, - Config, Configuration, Raft, Result, INVALID_ID, -}; - -use crate::test_util::new_message; - -type DefaultHashBuilder = std::hash::BuildHasherDefault; -type HashMap = std::collections::HashMap; -type HashSet = std::collections::HashSet; - -// Test that the API itself works. -// -// * Errors are returned from misuse. -// * Happy path returns happy values. -mod api { - use super::*; - // Test that the cluster can transition from a single node to a whole cluster. - #[test] - fn can_transition() -> Result<()> { - let l = default_logger(); - let mut raft = Raft::new( - &Config::new(1), - MemStorage::new_with_conf_state((vec![1], vec![])), - &l, - )?; - let begin_conf_change = begin_conf_change(&[1, 2, 3], &[4], raft.raft_log.last_index() + 1); - raft.begin_membership_change(&begin_conf_change)?; - let finalize_conf_change = finalize_conf_change(); - raft.finalize_membership_change(&finalize_conf_change)?; - Ok(()) - } - - // Test if the process rejects an overlapping voter and learner set. - #[test] - fn checks_for_overlapping_membership() -> Result<()> { - let l = default_logger(); - let mut raft = Raft::new( - &Config::new(1), - MemStorage::new_with_conf_state((vec![1], vec![])), - &l, - )?; - let begin_conf_change = - begin_conf_change(&[1, 2, 3], &[1, 2, 3], raft.raft_log.last_index() + 1); - assert!(raft.begin_membership_change(&begin_conf_change).is_err()); - Ok(()) - } - - // Test if the process rejects an voter demotion. - #[test] - fn checks_for_voter_demotion() -> Result<()> { - let l = default_logger(); - let config = Config::new(1); - let store = MemStorage::new_with_conf_state((vec![1, 2, 3], vec![4])); - let mut raft = Raft::new(&config, store, &l)?; - let begin_conf_change = begin_conf_change(&[1, 2], &[3, 4], raft.raft_log.last_index() + 1); - assert!(raft.begin_membership_change(&begin_conf_change).is_err()); - Ok(()) - } - - // Test if the process rejects an voter demotion. - #[test] - fn finalize_before_begin_fails_gracefully() -> Result<()> { - let l = default_logger(); - let mut raft = Raft::new( - &Config::new(1), - MemStorage::new_with_conf_state((vec![1, 2, 3], vec![4])), - &l, - )?; - let finalize_conf_change = finalize_conf_change(); - assert!(raft - .finalize_membership_change(&finalize_conf_change) - .is_err()); - Ok(()) - } -} - -// Test that small cluster is able to progress through adding a voter. -mod three_peers_add_voter { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -// Test that small cluster is able to progress through adding a learner. -mod three_peers_add_learner { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3], vec![4]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -// Test that small cluster is able to progress through removing a learner. -mod remove_learner { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![4]); - let new_configuration = (vec![1, 2, 3], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3, 4])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3, 4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[4, 3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -// Test that small cluster is able to progress through removing a voter. -mod remove_voter { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2]); - - Ok(()) - } -} - -// Test that small cluster is able to progress through removing a leader. -mod remove_leader { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![2, 3], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration.clone(), &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 3, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3]); - let peer_leaders = scenario.peer_leaders(); - for id in 1..=3 { - assert_eq!(peer_leaders[&id], INVALID_ID, "peer {}", id); - } - - info!(l, "Prompting a new election."); - { - let new_leader = scenario.peers.get_mut(&2).unwrap(); - for _ in new_leader.election_elapsed..=(new_leader.randomized_election_timeout() + 1) { - new_leader.tick(); - } - } - let messages = scenario.read_messages(); - scenario.send(messages); - - info!(l, "Verifying that all peers have the right peer group."); - for (_, peer) in scenario.peers.iter() { - assert_eq!( - peer.prs().configuration(), - &new_configuration.clone().into(), - ); - } - - info!(l, "Verifying that old leader cannot disrupt the cluster."); - { - let old_leader = scenario.peers.get_mut(&1).unwrap(); - for _ in old_leader.heartbeat_elapsed()..=(old_leader.heartbeat_timeout() + 1) { - old_leader.tick(); - } - } - let messages = scenario.read_messages(); - scenario.send(messages); - Ok(()) - } - - /// If the leader fails after the `Begin`, then recovers after the `Finalize`, the group should ignore it. - #[test] - fn leader_fails_and_recovers() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![2, 3], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 3, 1])?; - - scenario.isolate(1); // Simulate the leader failing. - - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[2, 3]); - - // At this point, 1 thinks it is a leader, but actually it isn't anymore. - - info!(l, "Prompting a new election."); - { - let new_leader = scenario.peers.get_mut(&2).unwrap(); - for _ in new_leader.election_elapsed..=(new_leader.randomized_election_timeout() + 1) { - new_leader.tick(); - } - } - let messages = scenario.read_messages(); - scenario.send(messages); - - scenario.recover(); - // Here we note that the old leader (1) has NOT applied the finalize operation and thus thinks it is still leader. - // - // The Raft paper notes that a removed leader should not disrupt the cluster. - // It suggests doing this by ignoring any `RequestVote` when it has heard from the leader within the minimum election timeout. - - info!(l, "Verifying that old leader cannot disrupt the cluster."); - { - let old_leader = scenario.peers.get_mut(&1).unwrap(); - for _ in old_leader.heartbeat_elapsed()..=(old_leader.heartbeat_timeout() + 1) { - old_leader.tick(); - } - } - let messages = scenario.read_messages(); - scenario.send(messages); - - let peer_leader = scenario.peer_leaders(); - assert_ne!(peer_leader[&2], 1); - assert_ne!(peer_leader[&3], 1); - Ok(()) - } -} - -// Test that small cluster is able to progress through replacing a voter. -mod three_peers_replace_voter { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 1, 4])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 4]); - - Ok(()) - } - - /// The leader power cycles before actually sending the messages. - #[test] - fn leader_power_cycles_no_compaction() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Leader power cycles."); - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - - if let Some(idx) = scenario.peers[&1].began_membership_change_at() { - let raft = scenario.peers.get_mut(&1).unwrap(); - let conf_state: ConfState = raft.prs().configuration().to_conf_state(); - let new_conf_state: ConfState = raft - .prs() - .next_configuration() - .as_ref() - .unwrap() - .to_conf_state(); - raft.mut_store() - .wl() - .set_conf_state(conf_state, Some((new_conf_state, idx))); - } - - scenario.power_cycle(&[1], None); - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - scenario.assert_in_membership_change(&[1]); - { - let peer = scenario.peers.get_mut(&1).unwrap(); - peer.become_candidate(); - peer.become_leader(); - for _ in peer.heartbeat_elapsed()..=(peer.heartbeat_timeout() + 1) { - peer.tick(); - } - } - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[4, 3, 2, 1, 4, 3, 2, 1])?; - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 5, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 4]); - - Ok(()) - } - - /// The leader power cycles before actually sending the messages. - #[test] - fn leader_power_cycles_compacted_log() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Compacting leader's log"); - // This snapshot has a term 1. - let snapshot = { - let peer = scenario.peers.get_mut(&1).unwrap(); - peer.raft_log.store.wl().commit_to_and_set_conf_states( - 3, - Some(peer.prs().configuration().to_conf_state()), - peer.pending_membership_change().clone(), - )?; - let snapshot = peer.raft_log.snapshot(0)?; - peer.raft_log.store.wl().compact(3)?; - snapshot - }; - - // At this point, there is a sentinel at index 3, term 2. - - info!(l, "Leader power cycles."); - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - scenario.power_cycle(&[1], snapshot.clone()); - { - let peer = scenario.peers.get_mut(&1).unwrap(); - peer.become_candidate(); - peer.become_leader(); - } - - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[1, 4, 1])?; // 1, 4, 1, 4, 1])?; - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - { - assert_eq!( - 4, - scenario.peers.get_mut(&4).unwrap().raft_log.unstable.offset - ); - let new_peer = scenario.peers.get_mut(&4).unwrap(); - let snap = new_peer.raft_log.snapshot(0).unwrap(); - new_peer.raft_log.store.wl().apply_snapshot(snap).unwrap(); - new_peer - .raft_log - .stable_snap_to(snapshot.get_metadata().index); - } - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 3, 2, 1, 3, 2, 1])?; - assert_eq!(scenario.peers[&1].began_membership_change_at(), Some(3)); - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 5, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } - - // Ensure if a peer in the old quorum fails, but the quorum is still big enough, it's ok. - #[test] - fn pending_delete_fails_after_begin() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - scenario.isolate(3); // Take 3 down. - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 1, 4])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 4]); - - Ok(()) - } - - // Ensure if a peer in the new quorum fails, but the quorum is still big enough, it's ok. - #[test] - fn pending_create_with_quorum_fails_after_begin() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - scenario.isolate(4); // Take 4 down. - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3]); - - Ok(()) - } - - // Ensure if the peer pending a deletion and the peer pending a creation both fail it's still ok (so long as both quorums hold). - #[test] - fn pending_create_and_destroy_both_fail() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - scenario.isolate(3); // Take 3 down. - scenario.isolate(4); // Take 4 down. - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2]); - - Ok(()) - } - - // Ensure if the old quorum fails during the joint state progress will halt until the peer group is recovered. - #[test] - fn old_quorum_fails() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Old quorum fails."); - scenario.isolate(3); // Take 3 down. - scenario.isolate(2); // Take 2 down. - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1, 4, 1, 4, 1, 4])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 4]); - scenario.assert_not_in_membership_change(&[2, 3]); - - info!( - l, - "Spinning for awhile to ensure nothing spectacular happens" - ); - for _ in scenario.peers[&leader].heartbeat_elapsed() - ..=scenario.peers[&leader].heartbeat_timeout() - { - scenario.peers.iter_mut().for_each(|(_, peer)| { - peer.tick(); - }); - let messages = scenario.read_messages(); - scenario.dispatch(messages)?; - } - - scenario.assert_in_membership_change(&[1, 4]); - scenario.assert_not_in_membership_change(&[2, 3]); - - info!(l, "Recovering old qourum."); - scenario.recover(); - - for _ in scenario.peers[&leader].heartbeat_elapsed() - ..=scenario.peers[&leader].heartbeat_timeout() - { - scenario.peers.iter_mut().for_each(|(_, peer)| { - peer.tick(); - }); - } - - info!(l, "Giving the peer group time to recover."); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3, 4, 1, 2, 3, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Failed peers confirming they have commited the begin."); - scenario.expect_read_and_dispatch_messages_from(&[2, 3])?; - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } - - // Ensure if the new quorum fails during the joint state progress will halt until the peer group is recovered. - #[test] - fn new_quorum_fails() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "New quorum fails."); - scenario.isolate(4); // Take 4 down. - scenario.isolate(2); // Take 2 down. - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1, 3])?; - - info!( - l, - "Leader waits to let the new quorum apply this before progressing." - ); - scenario.assert_in_membership_change(&[1]); - scenario.assert_not_in_membership_change(&[2, 3, 4]); - - info!( - l, - "Spinning for awhile to ensure nothing spectacular happens" - ); - for _ in scenario.peers[&leader].heartbeat_elapsed() - ..=scenario.peers[&leader].heartbeat_timeout() - { - scenario.peers.iter_mut().for_each(|(_, peer)| { - peer.tick(); - }); - let messages = scenario.read_messages(); - scenario.dispatch(messages)?; - } - - scenario.assert_in_membership_change(&[1]); - scenario.assert_not_in_membership_change(&[2, 3, 4]); - - info!(l, "Recovering new qourum."); - scenario.recover(); - - for _ in scenario.peers[&leader].heartbeat_elapsed() - ..=scenario.peers[&leader].heartbeat_timeout() - { - scenario.peers.iter_mut().for_each(|(_, peer)| { - peer.tick(); - }); - } - - info!(l, "Giving the peer group time to recover."); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3, 4, 1, 2, 4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3, 4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Failed peers confirming they have commited the begin."); - scenario.expect_read_and_dispatch_messages_from(&[2, 4])?; - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -// Test that small cluster is able to progress through adding a more with a learner. -mod three_peers_to_five_with_learner { - use super::*; - - /// In a steady state transition should proceed without issue. - #[test] - fn stable() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3, 4, 5], vec![6]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 5, 6, 1, 4, 5, 6, 1, 4])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4, 5, 6], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4, 5, 6]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4, 5, 6], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4, 5, 6]); - - Ok(()) - } - - /// In this, a single node (of 3) halts during the transition. - #[test] - fn minority_old_followers_halt_at_start() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3, 4, 5], vec![6]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.isolate(3); - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2]); - scenario.assert_not_in_membership_change(&[3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 5, 6, 1, 4, 5, 6, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4, 5, 6], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 4, 5, 6]); - scenario.assert_not_in_membership_change(&[3]); - - scenario.expect_read_and_dispatch_messages_from(&[4, 5, 6])?; - - info!(l, "Cluster leaving the joint."); - { - let leader = scenario.peers.get_mut(&1).unwrap(); - let ticks = leader.heartbeat_timeout(); - for _ in 0..=ticks { - leader.tick(); - } - } - scenario.expect_read_and_dispatch_messages_from(&[2, 1, 4, 5, 6, 1, 4, 5, 6, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 4, 5], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 4, 5]); - scenario.assert_not_in_membership_change(&[3]); - - Ok(()) - } -} - -mod intermingled_config_changes { - use super::*; - - // In this test, we make sure that if the peer group is sent a `BeginMembershipChange`, then immediately a `AddNode` entry, that the `AddNode` is rejected by the leader. - #[test] - fn begin_then_add_node() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3, 4], vec![]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader recieves an add node proposal, which it rejects since it is already in transition."); - let _ = scenario.propose_add_node_message(4); - assert_eq!( - scenario.peers[&scenario.old_leader] - .raft_log - .entries(5, 1) - .unwrap()[0] - .get_entry_type(), - EntryType::EntryNormal - ); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1, 4])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -mod compaction { - use super::*; - - // Ensure that if a Raft compacts its log before finalizing that there are no failures. - #[test] - fn begin_compact_then_finalize() -> Result<()> { - let l = default_logger(); - let leader = 1; - let old_configuration = (vec![1, 2, 3], vec![]); - let new_configuration = (vec![1, 2, 3], vec![4]); - let mut scenario = Scenario::new(leader, old_configuration, new_configuration, &l)?; - scenario.spawn_new_peers()?; - scenario.propose_change_message()?; - - info!(l, "Allowing quorum to commit"); - scenario.expect_read_and_dispatch_messages_from(&[1, 2, 3])?; - - info!(l, "Advancing leader, now entered the joint"); - scenario.assert_can_apply_transition_entry_at_index( - &[1], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1]); - - info!(l, "Leader replicates the commit and finalize entry."); - scenario.expect_read_and_dispatch_messages_from(&[1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[2, 3], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3]); - - info!(l, "Allowing new peers to catch up."); - scenario.expect_read_and_dispatch_messages_from(&[4, 1, 4, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[4], - 3, - ConfChangeType::BeginMembershipChange, - ); - scenario.assert_in_membership_change(&[1, 2, 3, 4]); - - info!(l, "Compacting the leaders log"); - scenario - .peers - .get_mut(&1) - .unwrap() - .raft_log - .store - .wl() - .compact(2)?; - - info!(l, "Cluster leaving the joint."); - scenario.expect_read_and_dispatch_messages_from(&[3, 2, 1])?; - scenario.assert_can_apply_transition_entry_at_index( - &[1, 2, 3, 4], - 4, - ConfChangeType::FinalizeMembershipChange, - ); - scenario.assert_not_in_membership_change(&[1, 2, 3, 4]); - - Ok(()) - } -} - -/// A test harness providing some useful utility and shorthand functions appropriate for this test suite. -/// -/// Since it derefs into `Network` it can be used the same way. So it acts as a transparent set of utilities over the standard `Network`. -/// The goal here is to boil down the test suite for Joint Consensus into the simplest terms possible, while allowing for control. -struct Scenario { - old_configuration: Configuration, - old_leader: u64, - new_configuration: Configuration, - network: Network, - logger: slog::Logger, -} -impl Deref for Scenario { - type Target = Network; - fn deref(&self) -> &Network { - &self.network - } -} -impl DerefMut for Scenario { - fn deref_mut(&mut self) -> &mut Network { - &mut self.network - } -} - -// TODO: Explore moving some functionality to `Network`. -impl Scenario { - /// Create a new scenario with the given state. - fn new( - leader: u64, - old_configuration: impl Into, - new_configuration: impl Into, - logger: &slog::Logger, - ) -> Result { - let old_configuration = old_configuration.into(); - let new_configuration = new_configuration.into(); - info!( - logger, - "Beginning scenario, old: {:?}, new: {:?}", old_configuration, new_configuration - ); - let starting_peers = old_configuration - .voters() - .iter() - .chain(old_configuration.learners().iter()) - .map(|&id| { - Some( - Raft::new( - &Config::new(id), - MemStorage::new_with_conf_state(old_configuration.to_conf_state()), - logger, - ) - .unwrap() - .into(), - ) - }) - .collect(); - let mut scenario = Scenario { - old_leader: leader, - old_configuration, - new_configuration, - network: Network::new(starting_peers, &logger), - logger: logger.clone(), - }; - // Elect the leader. - info!( - scenario.logger, - "Sending MsgHup to predetermined leader ({})", leader - ); - let message = new_message(leader, leader, MessageType::MsgHup, 0); - scenario.send(vec![message]); - Ok(scenario) - } - - /// Creates any peers which are pending creation. - /// - /// This *only* creates the peers and adds them to the `Network`. It does not take other - /// action. Newly created peers are only aware of the leader and themself. - fn spawn_new_peers(&mut self) -> Result<()> { - let new_peers = self.new_peers(); - info!(self.logger, "Creating new peers. {:?}", new_peers); - for &id in new_peers.voters() { - let raft = Raft::new( - &Config::new(id), - MemStorage::new_with_conf_state((vec![self.old_leader, id], vec![])), - &self.logger, - )?; - self.peers.insert(id, raft.into()); - } - for &id in new_peers.learners() { - let raft = Raft::new( - &Config::new(id), - MemStorage::new_with_conf_state((vec![self.old_leader], vec![id])), - &self.logger, - )?; - self.peers.insert(id, raft.into()); - } - Ok(()) - } - - /// Return the leader id according to each peer. - fn peer_leaders(&self) -> HashMap { - self.peers - .iter() - .map(|(&id, peer)| (id, peer.leader_id)) - .collect() - } - - /// Return a configuration containing only the peers pending creation. - fn new_peers(&self) -> Configuration { - let all_old = self - .old_configuration - .voters() - .union(&self.old_configuration.learners()) - .cloned() - .collect::>(); - Configuration::new( - self.new_configuration - .voters() - .difference(&all_old) - .cloned(), - self.new_configuration - .learners() - .difference(&all_old) - .cloned(), - ) - } - - /// Send a message proposing a "one-by-one" style AddNode configuration. - /// If the peers are in the midst joint consensus style (Begin/FinalizeMembershipChange) change they should reject it. - fn propose_add_node_message(&mut self, id: u64) -> Result<()> { - info!(self.logger, "Proposing add_node message. Target: {:?}", id,); - let message = build_propose_add_node_message( - self.old_leader, - id, - self.peers[&id].raft_log.last_index() + 1, - ); - self.dispatch(vec![message]) - } - - /// Send the message which proposes the configuration change. - fn propose_change_message(&mut self) -> Result<()> { - info!( - self.logger, - "Proposing change message. Target: {:?}", self.new_configuration - ); - let message = build_propose_change_message( - self.old_leader, - self.new_configuration.voters(), - self.new_configuration.learners(), - self.peers[&1].raft_log.last_index() + 1, - ); - self.dispatch(vec![message]) - } - - /// Checks that the given peers are not in a transition state. - fn assert_not_in_membership_change<'a>(&self, peers: impl IntoIterator) { - for peer in peers.into_iter().map(|id| &self.peers[id]) { - assert!( - !peer.is_in_membership_change(), - "Peer {} should not have been in a membership change.", - peer.id - ); - } - } - - // Checks that the given peers are in a transition state. - fn assert_in_membership_change<'a>(&self, peers: impl IntoIterator) { - for peer in peers.into_iter().map(|id| &self.peers[id]) { - assert!( - peer.is_in_membership_change(), - "Peer {} should have been in a membership change.", - peer.id - ); - } - } - - /// Reads the pending entries to be applied to a raft peer, checks one is of the expected variant, and applies it. Then, it advances the node to that point in the configuration change. - fn expect_apply_membership_change_entry<'a>( - &mut self, - peers: impl IntoIterator, - entry_type: ConfChangeType, - ) -> Result<()> { - for peer in peers { - debug!( - self.logger, - "Advancing peer, expecting an entry."; - "peer" => peer, - "entry type" => ?entry_type, - ); - let peer = self.network.peers.get_mut(peer).unwrap(); - if let Some(entries) = peer.raft_log.next_entries() { - peer.mut_store().wl().append(&entries).unwrap(); - let mut found = false; - for entry in &entries { - if entry.get_entry_type() == EntryType::EntryConfChange { - let mut conf_change = ConfChange::default(); - conf_change.merge_from_bytes(&entry.data)?; - if conf_change.get_change_type() == entry_type { - found = true; - match entry_type { - ConfChangeType::BeginMembershipChange => { - peer.begin_membership_change(&conf_change)? - } - ConfChangeType::FinalizeMembershipChange => { - peer.finalize_membership_change(&conf_change)? - } - ConfChangeType::AddNode => peer.add_node(conf_change.node_id)?, - _ => panic!("Unexpected conf change"), - }; - } - } - if found { - peer.raft_log.stable_to(entry.index, entry.term); - peer.raft_log.commit_to(entry.index); - peer.commit_apply(entry.index); - let hs = peer.hard_state(); - peer.mut_store().wl().set_hardstate(hs); - peer.tick(); - break; - } - } - assert!( - found, - "{:?} message not found for peer {}. Got: {:?}", - entry_type, peer.id, entries - ); - } else { - panic!("Didn't have any entries {}", peer.id); - } - } - Ok(()) - } - - /// Reads messages from each peer in a given list, and dispatches their message before moving to the next peer. - /// - /// Expects each peer to have a message. If the message is not defintely sent use `read_and_dispatch_messages_from`. - fn expect_read_and_dispatch_messages_from<'a>( - &mut self, - peers: impl IntoIterator, - ) -> Result<()> { - let peers = peers.into_iter().cloned(); - for (step, peer) in peers.enumerate() { - info!( - self.logger, - "Expecting and dispatching messages from {peer} at step {step}.", - peer = peer, - step = step, - ); - let messages = self.peers.get_mut(&peer).unwrap().read_messages(); - trace!( - self.logger, - "{peer} sends", - peer = peer; - "messages" => ?messages, - ); - assert!( - !messages.is_empty(), - "Expected peer {} to have messages at step {}.", - peer, - step - ); - self.dispatch(messages)?; - } - Ok(()) - } - - /// Simulate a power cycle in the given nodes. - /// - /// This means that the MemStorage is kept, but nothing else. - fn power_cycle<'a>( - &mut self, - peers: impl IntoIterator, - snapshot: impl Into>, - ) { - let peers = peers.into_iter().cloned(); - let snapshot = snapshot.into(); - for id in peers { - debug!(self.logger, "Power cycling {id}.", id = id); - let applied = self.peers[&id].raft_log.applied; - let mut peer = self.peers.remove(&id).expect("Peer did not exist."); - let store = peer.mut_store().clone(); - - let mut peer = Raft::new( - &Config { - id, - applied, - ..Default::default() - }, - store, - &self.logger, - ) - .expect("Could not create new Raft"); - - if let Some(ref snapshot) = snapshot { - peer.restore(snapshot.clone()); - }; - self.peers.insert(id, peer.into()); - } - } - - // Verify there is a transition entry at the given index of the given variant. - fn assert_membership_change_entry_at<'a>( - &self, - peers: impl IntoIterator, - index: u64, - entry_type: ConfChangeType, - ) { - let peers = peers.into_iter().cloned(); - for peer in peers { - let entry = &self.peers[&peer] - .raft_log - .slice(index, index + 1, None) - .unwrap()[0]; - assert_eq!(entry.get_entry_type(), EntryType::EntryConfChange); - let mut conf_change = ConfChange::default(); - conf_change.merge_from_bytes(&entry.data).unwrap(); - assert_eq!(conf_change.get_change_type(), entry_type); - } - } - - fn assert_can_apply_transition_entry_at_index<'a>( - &mut self, - peers: impl IntoIterator, - index: u64, - entry_type: ConfChangeType, - ) { - let peers = peers.into_iter().collect::>(); - self.expect_apply_membership_change_entry(peers.clone(), entry_type) - .unwrap(); - self.assert_membership_change_entry_at(peers, index, entry_type) - } -} - -fn conf_state<'a>( - voters: impl IntoIterator, - learners: impl IntoIterator, -) -> ConfState { - let voters = voters.into_iter().cloned().collect::>(); - let learners = learners.into_iter().cloned().collect::>(); - let mut conf_state = ConfState::default(); - conf_state.nodes = voters; - conf_state.learners = learners; - conf_state -} - -fn begin_conf_change<'a>( - voters: impl IntoIterator, - learners: impl IntoIterator, - index: u64, -) -> ConfChange { - let conf_state = conf_state(voters, learners); - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::BeginMembershipChange); - conf_change.set_configuration(conf_state); - conf_change.start_index = index; - conf_change -} - -fn finalize_conf_change() -> ConfChange { - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::FinalizeMembershipChange); - conf_change -} - -fn begin_entry<'a>( - voters: impl IntoIterator, - learners: impl IntoIterator, - index: u64, -) -> Entry { - let conf_change = begin_conf_change(voters, learners, index); - let data = conf_change.write_to_bytes().unwrap(); - let mut entry = Entry::default(); - entry.set_entry_type(EntryType::EntryConfChange); - entry.data = data; - entry.index = index; - entry -} - -fn build_propose_change_message<'a>( - recipient: u64, - voters: impl IntoIterator, - learners: impl IntoIterator, - index: u64, -) -> Message { - let begin_entry = begin_entry(voters, learners, index); - let mut message = Message::default(); - message.to = recipient; - message.set_msg_type(MessageType::MsgPropose); - message.index = index; - message.entries = vec![begin_entry].into(); - message -} - -fn build_propose_add_node_message(recipient: u64, added_id: u64, index: u64) -> Message { - let add_nodes_entry = { - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::AddNode); - conf_change.node_id = added_id; - let data = conf_change.write_to_bytes().unwrap(); - let mut entry = Entry::default(); - entry.set_entry_type(EntryType::EntryConfChange); - entry.data = data; - entry.index = index; - entry - }; - let mut message = Message::default(); - message.to = recipient; - message.set_msg_type(MessageType::MsgPropose); - message.index = index; - message.entries = vec![add_nodes_entry].into(); - message -} diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 146e52d91..273024b84 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -2798,7 +2798,7 @@ fn test_restore() { sm.prs().voter_ids(), s.get_metadata() .get_conf_state() - .nodes + .voters .iter() .cloned() .collect::>(), @@ -3782,7 +3782,7 @@ fn test_restore_with_learner() { assert_eq!(sm.prs().learners().count(), 1); let conf_state = s.get_metadata().get_conf_state(); - for &node in &conf_state.nodes { + for &node in &conf_state.voters { assert!(sm.prs().get(node).is_some()); assert!(!sm.prs().learner_ids().contains(&node)); } diff --git a/harness/tests/integration_cases/test_raw_node.rs b/harness/tests/integration_cases/test_raw_node.rs index f6ee5daeb..8442f1d5f 100644 --- a/harness/tests/integration_cases/test_raw_node.rs +++ b/harness/tests/integration_cases/test_raw_node.rs @@ -304,7 +304,7 @@ fn test_raw_node_propose_add_learner_node() -> Result<()> { let mut conf_change = ConfChange::default(); conf_change.merge_from_bytes(&e.data).unwrap(); let conf_state = raw_node.apply_conf_change(&conf_change)?; - assert_eq!(conf_state.nodes, vec![1]); + assert_eq!(conf_state.voters, vec![1]); assert_eq!(conf_state.learners, vec![2]); Ok(()) diff --git a/harness/tests/test_util/mod.rs b/harness/tests/test_util/mod.rs index 5e96d0ece..663756de3 100644 --- a/harness/tests/test_util/mod.rs +++ b/harness/tests/test_util/mod.rs @@ -167,10 +167,10 @@ pub fn empty_entry(term: u64, index: u64) -> Entry { new_entry(term, index, None) } -pub fn new_snapshot(index: u64, term: u64, nodes: Vec) -> Snapshot { +pub fn new_snapshot(index: u64, term: u64, voters: Vec) -> Snapshot { let mut s = Snapshot::default(); s.mut_metadata().index = index; s.mut_metadata().term = term; - s.mut_metadata().mut_conf_state().nodes = nodes; + s.mut_metadata().mut_conf_state().voters = voters; s } diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 21564a03b..18467d636 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -4,6 +4,7 @@ package eraftpb; enum EntryType { EntryNormal = 0; EntryConfChange = 1; + EntryConfChangeV2 = 2; } // The entry is a type of change that needs to be applied. It contains two data fields. @@ -31,14 +32,6 @@ message Entry { message SnapshotMetadata { // The current `ConfState`. ConfState conf_state = 1; - // The index of the current `ConfState`. - uint64 conf_state_index = 6; - - // The next `ConfState`, only set for membership change. - ConfState next_conf_state = 4; - // The index of the next `ConfState`. - uint64 next_conf_state_index = 5; - // The applied index. uint64 index = 2; // The term of the applied index. @@ -94,31 +87,94 @@ message HardState { uint64 commit = 3; } +enum ConfChangeTransition { + // Automatically use the simple protocol if possible, otherwise fall back + // to ConfChangeType::Implicit. Most applications will want to use this. + Auto = 0; + // Use joint consensus unconditionally, and transition out of them + // automatically (by proposing a zero configuration change). + // + // This option is suitable for applications that want to minimize the time + // spent in the joint configuration and do not store the joint configuration + // in the state machine (outside of InitialState). + Implicit = 1; + // Use joint consensus and remain in the joint configuration until the + // application proposes a no-op configuration change. This is suitable for + // applications that want to explicitly control the transitions, for example + // to use a custom payload (via the Context field). + Explicit = 2; +} + message ConfState { - repeated uint64 nodes = 1; + repeated uint64 voters = 1; repeated uint64 learners = 2; + + // The voters in the outgoing config. If not empty the node is in joint consensus. + repeated uint64 voters_outgoing = 3; + // The nodes that will become learners when the outgoing config is removed. + // These nodes are necessarily currently in nodes_joint (or they would have + // been added to the incoming config right away). + repeated uint64 learners_next = 4; + // If set, the config is joint and Raft will automatically transition into + // the final config (i.e. remove the outgoing config) when this is safe. + bool auto_leave = 5; } enum ConfChangeType { AddNode = 0; RemoveNode = 1; AddLearnerNode = 2; - BeginMembershipChange = 3; - FinalizeMembershipChange = 4; } message ConfChange { - uint64 id = 1; ConfChangeType change_type = 2; - // Used in `AddNode`, `RemoveNode`, and `AddLearnerNode`. uint64 node_id = 3; - - // Some extra information used in configuration change. bytes context = 4; - // For `BeginMembershipChange`, the target configuration is stored in the field. - ConfState configuration = 5; + uint64 id = 1; +} - // For `FinalizeMembershipChange`, it's the index of the pending membership change entry. - uint64 start_index = 6; +// ConfChangeSingle is an individual configuration change operation. Multiple +// such operations can be carried out atomically via a ConfChangeV2. +message ConfChangeSingle { + ConfChangeType type = 1; + uint64 node_id = 2; +} + +// ConfChangeV2 messages initiate configuration changes. They support both the +// simple "one at a time" membership change protocol and full Joint Consensus +// allowing for arbitrary changes in membership. +// +// The supplied context is treated as an opaque payload and can be used to +// attach an action on the state machine to the application of the config change +// proposal. Note that contrary to Joint Consensus as outlined in the Raft +// paper[1], configuration changes become active when they are *applied* to the +// state machine (not when they are appended to the log). +// +// The simple protocol can be used whenever only a single change is made. +// +// Non-simple changes require the use of Joint Consensus, for which two +// configuration changes are run. The first configuration change specifies the +// desired changes and transitions the Raft group into the joint configuration, +// in which quorum requires a majority of both the pre-changes and post-changes +// configuration. Joint Consensus avoids entering fragile intermediate +// configurations that could compromise survivability. For example, without the +// use of Joint Consensus and running across three availability zones with a +// replication factor of three, it is not possible to replace a voter without +// entering an intermediate configuration that does not survive the outage of +// one availability zone. +// +// The provided ConfChangeTransition specifies how (and whether) Joint Consensus +// is used, and assigns the task of leaving the joint configuration either to +// Raft or the application. Leaving the joint configuration is accomplished by +// proposing a ConfChangeV2 with only and optionally the Context field +// populated. +// +// For details on Raft membership changes, see: +// +// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf +message ConfChangeV2 { + ConfChangeTransition transition = 1; + repeated ConfChangeSingle changes = 2; + bytes context = 3; } diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 8f5437e5f..92f0f7038 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -13,22 +13,13 @@ mod protos { pub mod prelude { pub use crate::eraftpb::{ - ConfChange, ConfChangeType, ConfState, Entry, EntryType, HardState, Message, MessageType, - Snapshot, SnapshotMetadata, + ConfChange, ConfChangeSingle, ConfChangeTransition, ConfChangeType, ConfChangeV2, + ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot, SnapshotMetadata, }; } pub mod util { - use crate::eraftpb::{ConfChange, ConfChangeType, ConfState}; - - // Bring some consistency to things. The protobuf has `nodes` and it's not really a term that's used anymore. - impl ConfState { - /// Get the voters. This is identical to `nodes`. - #[inline] - pub fn get_voters(&self) -> &[u64] { - &self.nodes - } - } + use crate::eraftpb::ConfState; impl From<(Iter1, Iter2)> for ConfState where @@ -37,19 +28,9 @@ pub mod util { { fn from((voters, learners): (Iter1, Iter2)) -> Self { let mut conf_state = ConfState::default(); - conf_state.mut_nodes().extend(voters.into_iter()); + conf_state.mut_voters().extend(voters.into_iter()); conf_state.mut_learners().extend(learners.into_iter()); conf_state } } - - impl From<(u64, ConfState)> for ConfChange { - fn from((start_index, state): (u64, ConfState)) -> Self { - let mut change = ConfChange::default(); - change.set_change_type(ConfChangeType::BeginMembershipChange); - change.set_configuration(state); - change.start_index = start_index; - change - } - } } diff --git a/src/errors.rs b/src/errors.rs index dd9de08a2..569f98bf9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::StateRole; use protobuf::ProtobufError; use std::error; use std::{cmp, io, result}; @@ -63,18 +62,6 @@ quick_error! { NotExists(id: u64, set: &'static str) { display("The node {} is not in the {} set.", id, set) } - /// The action given requires the node to be in a particular state role. - InvalidState(role: StateRole) { - display("Cannot complete that action while in {:?} role.", role) - } - /// The node attempted to transition to a new membership configuration while there was none pending. - NoPendingMembershipChange { - display("No pending membership change. Create a pending transition with `Raft::propose_membership_change` on the leader.") - } - /// An argument violates a calling contract. - ViolatesContract(contract: String) { - display("An argument violate a calling contract: {}", contract) - } /// The request snapshot is dropped. RequestSnapshotDropped { description("raft: request snapshot dropped") diff --git a/src/lib.rs b/src/lib.rs index 07feaea34..7430cc380 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -365,6 +365,7 @@ need to update the applied index and resume `apply` later: match entry.get_entry_type() { EntryType::EntryNormal => handle_normal(entry), EntryType::EntryConfChange => handle_conf_change(entry), + EntryType::EntryConfChangeV2 => unimplemented!(), } } } @@ -434,39 +435,6 @@ let mut node = RawNode::new(&mut config, store, &logger).unwrap(); node.raft.become_candidate(); node.raft.become_leader(); -// Call this on the leader, or send the command via a normal `MsgPropose`. -node.raft.propose_membership_change(( - // Any IntoIterator. - // Voters - vec![1,3], // Remove 2, add 3. - // Learners - vec![4,5,6], // Add 4, 5, 6. -)).unwrap(); -# let idx = node.raft.raft_log.last_index(); - -# let entry = &node.raft.raft_log.entries(idx, 1).unwrap()[0]; -// ...Later when the begin entry is recieved from a `ready()` in the `entries` field... -let mut conf_change = ConfChange::default(); -conf_change.merge_from_bytes(&entry.data).unwrap(); -node.raft.begin_membership_change(&conf_change).unwrap(); -assert!(node.raft.is_in_membership_change()); -assert!(node.raft.prs().voter_ids().contains(&2)); -assert!(node.raft.prs().voter_ids().contains(&3)); -# -# // We hide this since the user isn't really encouraged to blindly call this, but we'd like a short -# // example. -# node.raft.raft_log.commit_to(idx); -# node.raft.commit_apply(idx); -# -# let idx = node.raft.raft_log.last_index(); -# let entry = &node.raft.raft_log.entries(idx, 1).unwrap()[0]; -// ...Later, when the finalize entry is recieved from a `ready()` in the `entries` field... -let mut conf_change = ConfChange::default(); -conf_change.merge_from_bytes(&entry.data).unwrap(); -node.raft.finalize_membership_change(&conf_change).unwrap(); -assert!(!node.raft.prs().voter_ids().contains(&2)); -assert!(node.raft.prs().voter_ids().contains(&3)); -assert!(!node.raft.is_in_membership_change()); ``` This process is a two-phase process, during the midst of it the peer group's leader is managing diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index 756eb125b..8077a2cb9 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -68,7 +68,7 @@ impl Configuration { /// Create a new `ConfState` from the configuration itself. pub fn to_conf_state(&self) -> ConfState { let mut state = ConfState::default(); - state.set_nodes(self.voters.iter().cloned().collect()); + state.set_voters(self.voters.iter().cloned().collect()); state.set_learners(self.learners.iter().cloned().collect()); state } @@ -76,7 +76,7 @@ impl Configuration { /// Create a new `Configuration` from a given `ConfState`. pub fn from_conf_state(conf_state: &ConfState) -> Self { Self { - voters: conf_state.nodes.iter().cloned().collect(), + voters: conf_state.voters.iter().cloned().collect(), learners: conf_state.learners.iter().cloned().collect(), } } @@ -154,10 +154,6 @@ pub struct ProgressSet { #[get = "pub"] configuration: Configuration, - /// The pending configuration, which will be adopted after the Finalize entry is applied. - #[get = "pub"] - next_configuration: Option, - // A preallocated buffer for sorting in the maximal_committed_index function. // You should not depend on these values unless you just set them. // We use a cell to avoid taking a `&mut self`. @@ -180,7 +176,6 @@ impl ProgressSet { ), sort_buffer: RefCell::from(Vec::with_capacity(voters)), configuration: Configuration::with_capacity(voters, learners), - next_configuration: Option::default(), logger, } } @@ -189,7 +184,6 @@ impl ProgressSet { self.progress.clear(); self.configuration.voters.clear(); self.configuration.learners.clear(); - self.next_configuration = None; } pub(crate) fn restore_snapmeta( @@ -200,7 +194,7 @@ impl ProgressSet { ) { self.clear(); let pr = Progress::new(next_idx, max_inflight); - for id in &meta.conf_state.as_ref().unwrap().nodes { + for id in &meta.conf_state.as_ref().unwrap().voters { self.progress.insert(*id, pr.clone()); self.configuration.voters.insert(*id); } @@ -209,20 +203,6 @@ impl ProgressSet { self.configuration.learners.insert(*id); } - if meta.next_conf_state_index != 0 { - let voters = &meta.next_conf_state.as_ref().unwrap().nodes; - let learners = &meta.next_conf_state.as_ref().unwrap().learners; - let mut configuration = Configuration::with_capacity(voters.len(), learners.len()); - for id in voters { - self.progress.insert(*id, pr.clone()); - configuration.voters.insert(*id); - } - for id in learners { - self.progress.insert(*id, pr.clone()); - configuration.learners.insert(*id); - } - self.next_configuration = Some(configuration); - } self.assert_progress_and_configuration_consistent(); } @@ -276,15 +256,7 @@ impl ProgressSet { /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. #[inline] pub fn voter_ids(&self) -> HashSet { - match self.next_configuration { - Some(ref next) => self - .configuration - .voters - .union(&next.voters) - .cloned() - .collect::>(), - None => self.configuration.voters.clone(), - } + self.configuration().voters().clone() } /// Returns the ids of all known learners. @@ -293,15 +265,7 @@ impl ProgressSet { /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. #[inline] pub fn learner_ids(&self) -> HashSet { - match self.next_configuration { - Some(ref next) => self - .configuration - .learners - .union(&next.learners) - .cloned() - .collect::>(), - None => self.configuration.learners.clone(), - } + self.configuration().learners().clone() } /// Grabs a reference to the progress of a node. @@ -340,7 +304,6 @@ impl ProgressSet { /// /// * `id` is in the voter set. /// * `id` is in the learner set. - /// * There is a pending membership change. pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<()> { debug!(self.logger, "Inserting voter with id {id}", id = id); @@ -348,10 +311,6 @@ impl ProgressSet { return Err(Error::Exists(id, "learners")); } else if self.voter_ids().contains(&id) { return Err(Error::Exists(id, "voters")); - } else if self.is_in_membership_change() { - return Err(Error::ViolatesContract( - "There is a pending membership change.".into(), - )); } self.configuration.voters.insert(id); @@ -366,7 +325,6 @@ impl ProgressSet { /// /// * `id` is in the voter set. /// * `id` is in the learner set. - /// * There is a pending membership change. pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<()> { debug!(self.logger, "Inserting learner with id {id}", id = id); @@ -374,10 +332,6 @@ impl ProgressSet { return Err(Error::Exists(id, "learners")); } else if self.voter_ids().contains(&id) { return Err(Error::Exists(id, "voters")); - } else if self.is_in_membership_change() { - return Err(Error::ViolatesContract( - "There is a pending membership change".into(), - )); } self.configuration.learners.insert(id); @@ -390,16 +344,8 @@ impl ProgressSet { /// /// # Errors /// - /// * There is a pending membership change. pub fn remove(&mut self, id: u64) -> Result> { debug!(self.logger, "Removing peer with id {id}", id = id); - - if self.is_in_membership_change() { - return Err(Error::ViolatesContract( - "There is a pending membership change.".into(), - )); - } - self.configuration.learners.remove(&id); self.configuration.voters.remove(&id); let removed = self.progress.remove(&id); @@ -412,12 +358,6 @@ impl ProgressSet { pub fn promote_learner(&mut self, id: u64) -> Result<()> { debug!(self.logger, "Promoting peer with id {id}", id = id); - if self.is_in_membership_change() { - return Err(Error::ViolatesContract( - "There is a pending membership change.".into(), - )); - } - if !self.configuration.learners.remove(&id) { // Wasn't already a learner. We can't promote what doesn't exist. return Err(Error::NotExists(id, "learners")); @@ -438,19 +378,6 @@ impl ProgressSet { .voters .union(&self.configuration.learners) .all(|v| self.progress.contains_key(v))); - debug_assert!(self - .progress - .keys() - .all(|v| self.configuration.learners.contains(v) - || self.configuration.voters.contains(v) - || self - .next_configuration - .as_ref() - .map_or(false, |c| c.learners.contains(v)) - || self - .next_configuration - .as_ref() - .map_or(false, |c| c.voters.contains(v)))); assert_eq!( self.voter_ids().len() + self.learner_ids().len(), self.progress.len() @@ -469,22 +396,7 @@ impl ProgressSet { }); // Reverse sort. matched.sort_by(|a, b| b.cmp(a)); - let mut mci = matched[matched.len() / 2]; - - if let Some(next) = &self.next_configuration { - matched.clear(); - next.voters().iter().for_each(|id| { - let peer = &self.progress[id]; - matched.push(peer.matched); - }); - // Reverse sort. - matched.sort_by(|a, b| b.cmp(a)); - let next_mci = matched[matched.len() / 2]; - if next_mci < mci { - mci = next_mci; - } - } - mci + matched[matched.len() / 2] } /// Returns the Candidate's eligibility in the current election. @@ -508,23 +420,11 @@ impl ProgressSet { }, ); - match self.next_configuration { - Some(ref next) => { - if next.has_quorum(&accepts) && self.configuration.has_quorum(&accepts) { - return CandidacyStatus::Elected; - } else if next.has_quorum(&rejects) || self.configuration.has_quorum(&rejects) { - return CandidacyStatus::Ineligible; - } - } - None => { - if self.configuration.has_quorum(&accepts) { - return CandidacyStatus::Elected; - } else if self.configuration.has_quorum(&rejects) { - return CandidacyStatus::Ineligible; - } - } - }; - + if self.configuration.has_quorum(&accepts) { + return CandidacyStatus::Elected; + } else if self.configuration.has_quorum(&rejects) { + return CandidacyStatus::Ineligible; + } CandidacyStatus::Eligible } @@ -542,14 +442,11 @@ impl ProgressSet { if pr.recent_active { active.insert(id); } - pr.recent_active = false; } - for (&_id, pr) in self.learners_mut() { + for pr in self.progress.values_mut() { pr.recent_active = false; } - self.configuration.has_quorum(&active) && - // If `next` is `None` we don't consider it, so just `true` it. - self.next_configuration.as_ref().map(|next| next.has_quorum(&active)).unwrap_or(true) + self.configuration.has_quorum(&active) } /// Determine if a quorum is formed from the given set of nodes. @@ -558,96 +455,6 @@ impl ProgressSet { #[inline] pub fn has_quorum(&self, potential_quorum: &HashSet) -> bool { self.configuration.has_quorum(potential_quorum) - && self - .next_configuration - .as_ref() - .map(|next| next.has_quorum(potential_quorum)) - // If `next` is `None` we don't consider it, so just `true` it. - .unwrap_or(true) - } - - /// Determine if the ProgressSet is represented by a transition state under Joint Consensus. - #[inline] - pub fn is_in_membership_change(&self) -> bool { - self.next_configuration.is_some() - } - - /// Enter a joint consensus state to transition to the specified configuration. - /// - /// The `next` provided should be derived from the `ConfChange` message. `progress` is used as - /// a basis for created peer `Progress` values. You are only expected to set `ins` from the - /// `raft.max_inflights` value. - /// - /// Once this state is entered the leader should replicate the `ConfChange` message. After the - /// majority of nodes, in both the current and the `next`, have committed the union state. At - /// this point the leader can call `finalize_config_transition` and replicate a message - /// commiting the change. - /// - /// Valid transitions: - /// * Non-existing -> Learner - /// * Non-existing -> Voter - /// * Learner -> Voter - /// * Learner -> Non-existing - /// * Voter -> Non-existing - /// - /// Errors: - /// * Voter -> Learner - /// * Member as voter and learner. - /// * Empty voter set. - pub(crate) fn begin_membership_change( - &mut self, - next: Configuration, - mut progress: Progress, - ) -> Result<()> { - next.valid()?; - // Demotion check. - if let Some(&demoted) = self - .configuration - .voters - .intersection(&next.learners) - .next() - { - return Err(Error::Exists(demoted, "learners")); - } - debug!( - self.logger, - "Beginning membership change"; - "next" => ?next, - ); - - // When a peer is first added/promoted, we should mark it as recently active. - // Otherwise, check_quorum may cause us to step down if it is invoked - // before the added peer has a chance to communicate with us. - progress.recent_active = true; - for id in next.voters.iter().chain(&next.learners) { - self.progress.entry(*id).or_insert_with(|| progress.clone()); - } - self.next_configuration = Some(next); - Ok(()) - } - - /// Finalizes the joint consensus state and transitions solely to the new state. - /// - /// This must be called only after calling `begin_membership_change` and after the majority - /// of peers in both the `current` and the `next` state have committed the changes. - pub fn finalize_membership_change(&mut self) -> Result<()> { - let next = self.next_configuration.take(); - match next { - None => Err(Error::NoPendingMembershipChange), - Some(next) => { - self.progress.retain(|id, _| next.contains(*id)); - if self.progress.capacity() >= (self.progress.len() << 1) { - self.progress.shrink_to_fit(); - } - self.configuration = next; - debug!( - self.logger, - "Finalizing membership change"; - "config" => ?self.configuration, - ); - Ok(()) - } - } } } @@ -655,10 +462,9 @@ impl ProgressSet { // See https://github.com/pingcap/raft-rs/issues/125 #[cfg(test)] mod test_progress_set { - use super::{Configuration, ProgressSet, Result}; + use super::{ProgressSet, Result}; use crate::default_logger; use crate::progress::Progress; - use crate::HashSet; const CANARY: u64 = 123; @@ -755,101 +561,4 @@ mod test_progress_set { assert_eq!(pre, *set.get(1).expect("Peer should not have been deleted")); Ok(()) } - - #[test] - fn test_membership_change_configuration_remove_voter() -> Result<()> { - check_membership_change_configuration((vec![1, 2], vec![]), (vec![1], vec![])) - } - - #[test] - fn test_membership_change_configuration_remove_learner() -> Result<()> { - check_membership_change_configuration((vec![1], vec![2]), (vec![1], vec![])) - } - - #[test] - fn test_membership_change_configuration_conflicting_sets() { - assert!( - check_membership_change_configuration((vec![1], vec![]), (vec![1], vec![1]),).is_err() - ) - } - - #[test] - fn test_membership_change_configuration_empty_sets() { - assert!(check_membership_change_configuration((vec![], vec![]), (vec![], vec![])).is_err()) - } - - #[test] - fn test_membership_change_configuration_empty_voters() { - assert!( - check_membership_change_configuration((vec![1], vec![]), (vec![], vec![]),).is_err() - ) - } - - #[test] - fn test_membership_change_configuration_add_voter() -> Result<()> { - check_membership_change_configuration((vec![1], vec![]), (vec![1, 2], vec![])) - } - - #[test] - fn test_membership_change_configuration_add_learner() -> Result<()> { - check_membership_change_configuration((vec![1], vec![]), (vec![1], vec![2])) - } - - #[test] - fn test_membership_change_configuration_promote_learner() -> Result<()> { - check_membership_change_configuration((vec![1], vec![2]), (vec![1, 2], vec![])) - } - - fn check_membership_change_configuration( - start: (impl IntoIterator, impl IntoIterator), - end: (impl IntoIterator, impl IntoIterator), - ) -> Result<()> { - let start_voters = start.0.into_iter().collect::>(); - let start_learners = start.1.into_iter().collect::>(); - let end_voters = end.0.into_iter().collect::>(); - let end_learners = end.1.into_iter().collect::>(); - let transition_voters = start_voters - .union(&end_voters) - .cloned() - .collect::>(); - let transition_learners = start_learners - .union(&end_learners) - .cloned() - .collect::>(); - - let mut set = ProgressSet::new(default_logger()); - let default_progress = Progress::new(0, 10); - - for starter in start_voters { - set.insert_voter(starter, default_progress.clone())?; - } - for starter in start_learners { - set.insert_learner(starter, default_progress.clone())?; - } - set.begin_membership_change( - Configuration::new(end_voters.clone(), end_learners.clone()), - default_progress, - )?; - assert!(set.is_in_membership_change()); - assert_eq!( - set.voter_ids(), - transition_voters, - "Transition state voters inaccurate" - ); - assert_eq!( - set.learner_ids(), - transition_learners, - "Transition state learners inaccurate." - ); - - set.finalize_membership_change()?; - assert!(!set.is_in_membership_change()); - assert_eq!(set.voter_ids(), end_voters, "End state voters inaccurate"); - assert_eq!( - set.learner_ids(), - end_learners, - "End state learners inaccurate" - ); - Ok(()) - } } diff --git a/src/raft.rs b/src/raft.rs index 467b31226..9f68ddbd7 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -27,20 +27,17 @@ use std::cmp; -use protobuf::Message as PbMessage; +use crate::eraftpb::{Entry, EntryType, HardState, Message, MessageType, Snapshot}; use rand::{self, Rng}; use slog::{self, Logger}; use super::errors::{Error, Result, StorageError}; -use super::progress::progress_set::{CandidacyStatus, Configuration, ProgressSet}; +use super::progress::progress_set::{CandidacyStatus, ProgressSet}; use super::progress::{Progress, ProgressState}; use super::raft_log::RaftLog; use super::read_only::{ReadOnly, ReadOnlyOption, ReadState}; use super::storage::Storage; use super::Config; -use crate::eraftpb::{ - ConfChange, ConfChangeType, Entry, EntryType, HardState, Message, MessageType, Snapshot, -}; use crate::util; use crate::{HashMap, HashSet}; @@ -154,22 +151,6 @@ pub struct Raft { /// we set this to one. pub pending_conf_index: u64, - /// The last `BeginMembershipChange` entry. Once we make this change we exit the joint state. - /// - /// This is different than `pending_conf_index` since it is more specific, and also exact. - /// While `pending_conf_index` is conservatively set at times to ensure safety in the - /// one-by-one change method, in joint consensus based changes we track the state exactly. The - /// index here **must** only be set when a `BeginMembershipChange` is present at that index. - /// - /// # Caveats - /// - /// It is important that whenever this is set that `pending_conf_index` is also set to the - /// value if it is greater than the existing value. - /// - /// **Use `Raft::set_pending_membership_change()` to change this value.** - #[get = "pub"] - pending_membership_change: Option, - /// The queue of read-only requests. pub read_only: ReadOnly, @@ -240,7 +221,7 @@ impl Raft { let logger = logger.new(o!("raft_id" => c.id)); let raft_state = store.initial_state()?; let conf_state = &raft_state.conf_state; - let peers = &conf_state.nodes; + let voters = &conf_state.voters; let learners = &conf_state.learners; let mut r = Raft { @@ -250,7 +231,7 @@ impl Raft { max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, prs: Some(ProgressSet::with_capacity( - peers.len(), + voters.len(), learners.len(), logger.clone(), )), @@ -269,7 +250,6 @@ impl Raft { term: Default::default(), election_elapsed: Default::default(), pending_conf_index: Default::default(), - pending_membership_change: Default::default(), vote: Default::default(), heartbeat_elapsed: Default::default(), randomized_election_timeout: 0, @@ -279,7 +259,7 @@ impl Raft { batch_append: c.batch_append, logger, }; - for p in peers { + for p in voters { let pr = Progress::new(1, r.max_inflight); if let Err(e) = r.mut_prs().insert_voter(*p, pr) { fatal!(r.logger, "{}", e); @@ -304,17 +284,6 @@ impl Raft { let term = r.term; r.become_follower(term, INVALID_ID); - // Used to resume Joint Consensus Changes - let pending_conf_state = raft_state.pending_conf_state(); - let pending_conf_state_start_index = raft_state.pending_conf_state_start_index(); - match (pending_conf_state, pending_conf_state_start_index) { - (Some(state), Some(idx)) => { - r.begin_membership_change(&ConfChange::from((*idx, state.clone())))?; - } - (None, None) => (), - _ => unreachable!("Should never find pending_conf_change without an index."), - }; - info!( r.logger, "newRaft"; @@ -324,7 +293,6 @@ impl Raft { "last index" => r.raft_log.last_index(), "last term" => r.raft_log.last_term(), "peers" => ?r.prs().voters().collect::>(), - "pending membership change" => ?r.pending_membership_change(), ); Ok(r) } @@ -423,32 +391,6 @@ impl Raft { self.skip_bcast_commit = skip; } - /// Set when the peer began a joint consensus change. - /// - /// This will also set `pending_conf_index` if it is larger than the existing number. - #[inline] - fn set_pending_membership_change(&mut self, maybe_change: impl Into>) { - let maybe_change = maybe_change.into(); - if let Some(ref change) = maybe_change { - let index = change.start_index; - assert!(self.pending_membership_change.is_none() || index == self.pending_conf_index); - if index > self.pending_conf_index { - self.pending_conf_index = index; - } - } - self.pending_membership_change = maybe_change.clone(); - } - - /// Get the index which the pending membership change started at. - /// - /// > **Note:** This is an experimental feature. - #[inline] - pub fn began_membership_change_at(&self) -> Option { - self.pending_membership_change - .as_ref() - .map(|c| c.start_index) - } - /// Set whether batch append msg at runtime. #[inline] pub fn set_batch_append(&mut self, batch_append: bool) { @@ -710,33 +652,6 @@ impl Raft { pub fn commit_apply(&mut self, applied: u64) { #[allow(deprecated)] self.raft_log.applied_to(applied); - - // Check to see if we need to finalize a Joint Consensus state now. - let start_index = self - .pending_membership_change - .as_ref() - .map(|v| Some(v.start_index)) - .unwrap_or(None); - - if let Some(index) = start_index { - // Invariant: We know that if we have committed past some index, we can also commit that index. - if applied >= index && self.state == StateRole::Leader { - // We must replicate the commit entry. - self.append_finalize_conf_change_entry(); - } - } - } - - fn append_finalize_conf_change_entry(&mut self) { - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::FinalizeMembershipChange); - let data = conf_change.write_to_bytes().unwrap(); - let mut entry = Entry::default(); - entry.set_entry_type(EntryType::EntryConfChange); - entry.data = data; - // Index/Term set here. - self.append_entry(&mut [entry]); - self.bcast_append(); } /// Resets the current node to a given term. @@ -942,28 +857,6 @@ impl Raft { self.append_entry(&mut [Entry::default()]); - // In most cases, we append only a new entry marked with an index and term. - // In the specific case of a node recovering while in the middle of a membership change, - // and the finalization entry may have been lost, we must also append that, since it - // would be overwritten by the term change. - let change_start_index = self - .pending_membership_change - .as_ref() - .map(|v| Some(v.start_index)) - .unwrap_or(None); - if let Some(index) = change_start_index { - trace!( - self.logger, - "Checking if we need to finalize again..., began: {began}, applied: {applied}, committed: {committed}", - began = index, - applied = self.raft_log.applied, - committed = self.raft_log.committed - ); - if index <= self.raft_log.committed { - self.append_finalize_conf_change_entry(); - } - } - info!( self.logger, "became leader at term {term}", @@ -1288,105 +1181,6 @@ impl Raft { } } - /// Apply a `BeginMembershipChange` variant `ConfChange`. - /// - /// > **Note:** This is an experimental feature. - /// - /// When a Raft node applies this variant of a configuration change it will adopt a joint - /// configuration state until the membership change is finalized. - /// - /// During this time the `Raft` will have two, possibly overlapping, cooperating quorums for - /// both elections and log replication. - /// - /// # Errors - /// - /// * `ConfChange.change_type` is not `BeginMembershipChange` - /// * `ConfChange.configuration` does not exist. - /// * `ConfChange.start_index` does not exist. It **must** equal the index of the - /// corresponding entry. - #[inline(always)] - pub fn begin_membership_change(&mut self, conf_change: &ConfChange) -> Result<()> { - if conf_change.get_change_type() != ConfChangeType::BeginMembershipChange { - return Err(Error::ViolatesContract(format!( - "{:?} != BeginMembershipChange", - conf_change.change_type - ))); - } - if !conf_change.has_configuration() { - return Err(Error::ViolatesContract( - "!ConfChange::has_configuration()".into(), - )); - } - if conf_change.start_index == 0 { - return Err(Error::ViolatesContract( - "!ConfChange::has_start_index()".into(), - )); - }; - - self.set_pending_membership_change(conf_change.clone()); - let pr = Progress::new(self.raft_log.last_index() + 1, self.max_inflight); - self.mut_prs().begin_membership_change( - Configuration::from_conf_state(conf_change.get_configuration()), - pr, - )?; - Ok(()) - } - - /// Apply a `FinalizeMembershipChange` variant `ConfChange`. - /// - /// > **Note:** This is an experimental feature. - /// - /// When a Raft node applies this variant of a configuration change it will finalize the - /// transition begun by [`begin_membership_change`]. - /// - /// Once this is called the Raft will no longer have two, possibly overlapping, cooperating - /// qourums. - /// - /// # Errors - /// - /// * This Raft is not in a configuration change via `begin_membership_change`. - /// * `ConfChange.change_type` is not a `FinalizeMembershipChange`. - /// * `ConfChange.configuration` value should not exist. - /// * `ConfChange.start_index` value should not exist. - #[inline(always)] - pub fn finalize_membership_change(&mut self, conf_change: &ConfChange) -> Result<()> { - if conf_change.get_change_type() != ConfChangeType::FinalizeMembershipChange { - return Err(Error::ViolatesContract(format!( - "{:?} != BeginMembershipChange", - conf_change.change_type - ))); - } - if conf_change.has_configuration() { - return Err(Error::ViolatesContract( - "ConfChange::has_configuration()".into(), - )); - }; - let leader_in_new_set = self - .prs() - .next_configuration() - .as_ref() - .map(|config| config.contains(self.leader_id)) - .ok_or_else(|| Error::NoPendingMembershipChange)?; - - // Joint Consensus, in the Raft paper, states the leader should step down and become a - // follower if it is removed during a transition. - if !leader_in_new_set { - let last_term = self.raft_log.last_term(); - if self.state == StateRole::Leader { - self.become_follower(last_term, INVALID_ID); - } else { - // It's no longer safe to lookup the ID in the ProgressSet, remove it. - self.leader_id = INVALID_ID; - } - } - - self.mut_prs().finalize_membership_change()?; - // Ensure we reset this on *any* node, since the leader might have failed - // and we don't want to finalize twice. - self.set_pending_membership_change(None); - Ok(()) - } - fn log_vote_approve(&self, m: &Message) { info!( self.logger, @@ -2209,14 +2003,6 @@ impl Raft { } self.prs = Some(prs); - if meta.next_conf_state_index > 0 { - let cs = meta.get_next_conf_state().clone(); - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::BeginMembershipChange); - conf_change.set_configuration(cs); - conf_change.start_index = meta.next_conf_state_index; - self.pending_membership_change = Some(conf_change); - } self.pending_request_snapshot = INVALID_INDEX; None } @@ -2240,7 +2026,7 @@ impl Raft { /// This method can be false positive. #[inline] pub fn has_pending_conf(&self) -> bool { - self.pending_conf_index > self.raft_log.applied || self.pending_membership_change.is_some() + self.pending_conf_index > self.raft_log.applied } /// Specifies if the commit should be broadcast. @@ -2254,76 +2040,10 @@ impl Raft { self.promotable } - /// Propose that the peer group change its active set to a new set. - /// - /// > **Note:** This is an experimental feature. - /// - /// ```rust - /// use slog::{Drain, o}; - /// use raft::{Raft, Config, storage::MemStorage, eraftpb::ConfState}; - /// use raft::raw_node::RawNode; - /// use raft::Configuration; - /// let config = Config { - /// id: 1, - /// ..Default::default() - /// }; - /// let store = MemStorage::new_with_conf_state((vec![1], vec![])); - /// let logger = slog::Logger::root(slog_stdlog::StdLog.fuse(), o!()); - /// let mut node = RawNode::new(&config, store, &logger).unwrap(); - /// let mut raft = node.raft; - /// raft.become_candidate(); - /// raft.become_leader(); // It must be a leader! - /// - /// let mut conf = ConfState::default(); - /// conf.nodes = vec![1,2,3]; - /// conf.learners = vec![4]; - /// if let Err(e) = raft.propose_membership_change(Configuration::from_conf_state(&conf)) { - /// panic!("{}", e); - /// } - /// ``` - /// - /// # Errors - /// - /// * This Peer is not leader. - /// * `voters` and `learners` are not mutually exclusive. - /// * `voters` is empty. - pub fn propose_membership_change(&mut self, config: impl Into) -> Result<()> { - if self.state != StateRole::Leader { - return Err(Error::InvalidState(self.state)); - } - let config = config.into(); - config.valid()?; - debug!( - self.logger, - "replicating SetNodes"; - "voters" => ?config.voters(), - "learners" => ?config.learners(), - ); - let destination_index = self.raft_log.last_index() + 1; - // Prep a configuration change to append. - let mut conf_change = ConfChange::default(); - conf_change.set_change_type(ConfChangeType::BeginMembershipChange); - conf_change.set_configuration(config.to_conf_state()); - conf_change.start_index = destination_index; - let data = conf_change.write_to_bytes()?; - let mut entry = Entry::default(); - entry.set_entry_type(EntryType::EntryConfChange); - entry.data = data; - let mut message = Message::default(); - message.set_msg_type(MessageType::MsgPropose); - message.from = self.id; - message.index = destination_index; - message.set_entries(vec![entry].into()); - // `append_entry` sets term, index for us. - self.step(message)?; - Ok(()) - } - /// # Errors /// /// * `id` is already a voter. /// * `id` is already a learner. - /// * There is a pending membership change. (See `is_in_membership_change()`) fn add_voter_or_learner(&mut self, id: u64, learner: bool) -> Result<()> { debug!( self.logger, @@ -2362,7 +2082,6 @@ impl Raft { /// /// * `id` is already a voter. /// * `id` is already a learner. - /// * There is a pending membership change. (See `is_in_membership_change()`) pub fn add_node(&mut self, id: u64) -> Result<()> { self.add_voter_or_learner(id, false) } @@ -2373,7 +2092,6 @@ impl Raft { /// /// * `id` is already a voter. /// * `id` is already a learner. - /// * There is a pending membership change. (See `is_in_membership_change()`) pub fn add_learner(&mut self, id: u64) -> Result<()> { self.add_voter_or_learner(id, true) } @@ -2383,7 +2101,6 @@ impl Raft { /// # Errors /// /// * `id` is not a voter or learner. - /// * There is a pending membership change. (See `is_in_membership_change()`) pub fn remove_node(&mut self, id: u64) -> Result<()> { self.mut_prs().remove(id)?; @@ -2498,11 +2215,6 @@ impl Raft { self.lead_transferee = None; } - /// Determine if the Raft is in a transition state under Joint Consensus. - pub fn is_in_membership_change(&self) -> bool { - self.prs().is_in_membership_change() - } - fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/raw_node.rs b/src/raw_node.rs index 172ec020a..d93c5ad74 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -322,18 +322,10 @@ impl RawNode { } /// Takes the conf change and applies it. - /// - /// # Panics - /// - /// In the case of `BeginMembershipChange` or `FinalizeConfChange` returning errors this will panic. - /// - /// For a safe interface for these directly call `this.raft.begin_membership_change(entry)` or - /// `this.raft.finalize_membership_change(entry)` respectively. pub fn apply_conf_change(&mut self, cc: &ConfChange) -> Result { - if cc.node_id == INVALID_ID && cc.get_change_type() != ConfChangeType::BeginMembershipChange - { + if cc.node_id == INVALID_ID { let mut cs = ConfState::default(); - cs.nodes = self.raft.prs().voter_ids().iter().cloned().collect(); + cs.voters = self.raft.prs().voter_ids().iter().cloned().collect(); cs.learners = self.raft.prs().learner_ids().iter().cloned().collect(); return Ok(cs); } @@ -342,10 +334,6 @@ impl RawNode { ConfChangeType::AddNode => self.raft.add_node(nid)?, ConfChangeType::AddLearnerNode => self.raft.add_learner(nid)?, ConfChangeType::RemoveNode => self.raft.remove_node(nid)?, - ConfChangeType::BeginMembershipChange => self.raft.begin_membership_change(cc)?, - ConfChangeType::FinalizeMembershipChange => { - self.raft.mut_prs().finalize_membership_change()? - } }; Ok(self.raft.prs().configuration().to_conf_state()) diff --git a/src/storage.rs b/src/storage.rs index aa329bc7c..62f47097b 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -48,19 +48,6 @@ pub struct RaftState { /// Records the current node IDs like `[1, 2, 3]` in the cluster. Every Raft node must have a /// unique ID in the cluster; pub conf_state: ConfState, - - /// If this peer is in the middle of a membership change (The period between - /// `BeginMembershipChange` and `FinalizeMembershipChange`) this will hold the final desired - /// state. - #[get = "pub"] - #[set] - pending_conf_state: Option, - - /// If `pending_conf_state` exists this will contain the index of the `BeginMembershipChange` - /// entry. - #[get = "pub"] - #[set] - pending_conf_state_start_index: Option, } impl RaftState { @@ -69,8 +56,6 @@ impl RaftState { RaftState { hard_state, conf_state, - pending_conf_state: None, - pending_conf_state_start_index: None, } } /// Indicates the `RaftState` is initialized or not. @@ -189,16 +174,8 @@ impl MemStorageCore { } /// Saves the current conf state. - pub fn set_conf_state( - &mut self, - cs: ConfState, - pending_membership_change: Option<(ConfState, u64)>, - ) { + pub fn set_conf_state(&mut self, cs: ConfState) { self.raft_state.conf_state = cs; - if let Some((cs, idx)) = pending_membership_change { - self.raft_state.pending_conf_state = Some(cs); - self.raft_state.pending_conf_state_start_index = Some(idx); - } } #[inline] @@ -242,12 +219,6 @@ impl MemStorageCore { // Update conf states. self.raft_state.conf_state = meta.take_conf_state(); - if meta.next_conf_state_index > 0 { - let cs = meta.take_next_conf_state(); - let i = meta.next_conf_state_index; - self.raft_state.pending_conf_state = Some(cs); - self.raft_state.pending_conf_state_start_index = Some(i); - } Ok(()) } @@ -262,11 +233,6 @@ impl MemStorageCore { meta.term = term; meta.set_conf_state(self.raft_state.conf_state.clone()); - if let Some(ref cs) = self.raft_state.pending_conf_state { - let i = self.raft_state.pending_conf_state_start_index.unwrap(); - meta.set_next_conf_state(cs.clone()); - meta.next_conf_state_index = i; - } snapshot } @@ -331,22 +297,11 @@ impl MemStorageCore { } /// Commit to `idx` and set configuration to the given states. Only used for tests. - pub fn commit_to_and_set_conf_states( - &mut self, - idx: u64, - cs: Option, - pending_membership_change: Option, - ) -> Result<()> { + pub fn commit_to_and_set_conf_states(&mut self, idx: u64, cs: Option) -> Result<()> { self.commit_to(idx)?; if let Some(cs) = cs { self.raft_state.conf_state = cs; } - if let Some(mut pending_change) = pending_membership_change { - let conf_state = pending_change.take_configuration(); - self.raft_state.pending_conf_state = Some(conf_state); - let index = pending_change.start_index; - self.raft_state.pending_conf_state_start_index = Some(index); - } Ok(()) } @@ -521,11 +476,11 @@ mod test { m.compute_size() as u32 } - fn new_snapshot(index: u64, term: u64, nodes: Vec) -> Snapshot { + fn new_snapshot(index: u64, term: u64, voters: Vec) -> Snapshot { let mut s = Snapshot::default(); s.mut_metadata().index = index; s.mut_metadata().term = term; - s.mut_metadata().mut_conf_state().nodes = nodes; + s.mut_metadata().mut_conf_state().voters = voters; s } @@ -680,7 +635,7 @@ mod test { let ents = vec![new_entry(3, 3), new_entry(4, 4), new_entry(5, 5)]; let nodes = vec![1, 2, 3]; let mut conf_state = ConfState::default(); - conf_state.nodes = nodes.clone(); + conf_state.voters = nodes.clone(); let unavailable = Err(RaftError::Store( StorageError::SnapshotTemporarilyUnavailable,