diff --git a/.gitignore b/.gitignore index a484e4d95..c4f2457f7 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,8 @@ target tmp /bin +harness/target + Cargo.lock *.rs.bk *.rs.fmt diff --git a/Cargo.toml b/Cargo.toml index 0334bbd29..2d850bd04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,8 +13,10 @@ categories = ["algorithms", "database-implementations"] [features] default = [] +# Enable failpoints failpoint = ["fail"] +# Make sure to synchronize updates with Harness. [dependencies] log = ">0.2" protobuf = "~2.0-2.2" @@ -28,6 +30,7 @@ getset = "0.0.6" env_logger = "0.5" criterion = ">0.2.4" lazy_static = "1.0" +harness = { path = "harness" } regex = "1.1" [[bench]] diff --git a/harness/Cargo.toml b/harness/Cargo.toml new file mode 100644 index 000000000..b6a3a9a26 --- /dev/null +++ b/harness/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "harness" +version = "0.1.0" +authors = ["The TiKV Project Developers"] +license = "Apache-2.0" +keywords = [] +repository = "https://github.com/pingcap/raft-rs/harness" +readme = "README.md" +homepage = "https://github.com/pingcap/raft-rs/harness" +description = "A testing harness for Raft." +categories = [] + +# Make sure to synchronize updates with Raft. +[dependencies] +raft = { path = ".." } +rand = "0.5.4" +env_logger = "0.5" diff --git a/harness/src/interface.rs b/harness/src/interface.rs new file mode 100644 index 000000000..2d03b9ea0 --- /dev/null +++ b/harness/src/interface.rs @@ -0,0 +1,115 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use raft::{eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result}; +use std::ops::{Deref, DerefMut}; + +/// A simulated Raft façade for testing. +/// +/// If the contained value is a `Some` operations happen. If they are a `None` operations are +/// a no-op. +/// +// Compare to upstream, we use struct instead of trait here. +// Because to be able to cast Interface later, we have to make +// Raft derive Any, which will require a lot of dependencies to derive Any. +// That's not worthy for just testing purpose. +pub struct Interface { + /// The raft peer. + pub raft: Option>, +} + +impl Interface { + /// Create a new interface to a new raft. + pub fn new(r: Raft) -> Interface { + Interface { raft: Some(r) } + } + + /// Step the raft, if it exists. + pub fn step(&mut self, m: Message) -> Result<()> { + match self.raft { + Some(_) => Raft::step(self, m), + None => Ok(()), + } + } + + /// Read messages out of the raft. + pub fn read_messages(&mut self) -> Vec { + match self.raft { + Some(_) => self.msgs.drain(..).collect(), + None => vec![], + } + } + + /// Initialize a raft with the given ID and peer set. + pub fn initial(&mut self, id: u64, ids: &[u64]) { + if self.raft.is_some() { + self.id = id; + let prs = self.take_prs(); + self.set_prs(ProgressSet::with_capacity( + ids.len(), + prs.learner_ids().len(), + )); + for id in ids { + let progress = Progress::new(0, 256); + if prs.learner_ids().contains(id) { + if let Err(e) = self.mut_prs().insert_learner(*id, progress) { + panic!("{}", e); + } + } else if let Err(e) = self.mut_prs().insert_voter(*id, progress) { + panic!("{}", e); + } + } + let term = self.term; + self.reset(term); + } + } +} + +impl From>> for Interface { + fn from(raft: Option>) -> Self { + Self { raft } + } +} + +impl From> for Interface { + fn from(raft: Raft) -> Self { + Self { raft: Some(raft) } + } +} + +impl Deref for Interface { + type Target = Raft; + fn deref(&self) -> &Raft { + self.raft.as_ref().unwrap() + } +} + +impl DerefMut for Interface { + fn deref_mut(&mut self) -> &mut Raft { + self.raft.as_mut().unwrap() + } +} diff --git a/harness/src/lib.rs b/harness/src/lib.rs new file mode 100644 index 000000000..9d005e0c8 --- /dev/null +++ b/harness/src/lib.rs @@ -0,0 +1,49 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/*! + +This module contains various testing harness utilities for Raft. + +> If you want to build Raft without this, disable the `harness` feature. + +*/ + +extern crate env_logger; +extern crate raft; +extern crate rand; + +mod interface; +mod network; + +pub use self::{interface::Interface, network::Network}; + +/// Do any common test initialization. Eg set up logging. +#[doc(hidden)] +pub fn setup_for_test() { + let _ = env_logger::try_init(); +} diff --git a/harness/src/network.rs b/harness/src/network.rs new file mode 100644 index 000000000..095a194ad --- /dev/null +++ b/harness/src/network.rs @@ -0,0 +1,208 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::interface::Interface; +use raft::{ + eraftpb::{Message, MessageType}, + storage::MemStorage, + Config, Raft, Result, NO_LIMIT, +}; +use rand; +use std::collections::HashMap; + +#[derive(Default, Debug, PartialEq, Eq, Hash)] +struct Connem { + from: u64, + to: u64, +} + +/// A simulated network for testing. +/// +/// You can use this to create a test network of Raft nodes. +/// +/// *Please note:* no actual network calls are made. +#[derive(Default)] +pub struct Network { + /// The set of raft peers. + pub peers: HashMap, + /// The storage of the raft peers. + pub storage: HashMap, + dropm: HashMap, + ignorem: HashMap, +} + +impl Network { + /// Initializes a network from peers. + /// + /// Nodes will recieve their ID based on their index in the vector, starting with 1. + /// + /// A `None` node will be replaced with a new Raft node. + pub fn new(peers: Vec>) -> Network { + Network::new_with_config(peers, false) + } + + /// Explicitly set the pre_vote option on newly created rafts. + /// + /// **TODO:** Make this accept any config. + pub fn new_with_config(mut peers: Vec>, pre_vote: bool) -> Network { + let size = peers.len(); + let peer_addrs: Vec = (1..=size as u64).collect(); + let mut nstorage = HashMap::new(); + let mut npeers = HashMap::new(); + for (p, id) in peers.drain(..).zip(peer_addrs.clone()) { + match p { + None => { + nstorage.insert(id, MemStorage::default()); + let r = Interface::new( + Raft::new( + &Config { + id, + peers: peer_addrs.clone(), + election_tick: 10, + heartbeat_tick: 1, + max_size_per_msg: NO_LIMIT, + max_inflight_msgs: 256, + pre_vote, + ..Default::default() + }, + nstorage[&id].clone(), + ) + .unwrap(), + ); + npeers.insert(id, r); + } + Some(mut p) => { + p.initial(id, &peer_addrs); + npeers.insert(id, p); + } + } + } + Network { + peers: npeers, + storage: nstorage, + ..Default::default() + } + } + + /// Ignore a given `MessageType`. + pub fn ignore(&mut self, t: MessageType) { + self.ignorem.insert(t, true); + } + + /// Filter out messages that should be dropped according to rules set by `ignore` or `drop`. + pub fn filter(&self, msgs: impl IntoIterator) -> Vec { + msgs.into_iter() + .filter(|m| { + if self + .ignorem + .get(&m.get_msg_type()) + .cloned() + .unwrap_or(false) + { + return false; + } + // hups never go over the network, so don't drop them but panic + assert_ne!(m.get_msg_type(), MessageType::MsgHup, "unexpected msgHup"); + let perc = self + .dropm + .get(&Connem { + from: m.get_from(), + to: m.get_to(), + }) + .cloned() + .unwrap_or(0f64); + rand::random::() >= perc + }) + .collect() + } + + pub fn read_messages(&mut self) -> Vec { + self.peers + .iter_mut() + .flat_map(|(_peer, progress)| progress.read_messages()) + .collect() + } + + /// Instruct the cluster to `step` through the given messages. + pub fn send(&mut self, msgs: Vec) { + let mut msgs = msgs; + while !msgs.is_empty() { + let mut new_msgs = vec![]; + for m in msgs.drain(..) { + let resp = { + let p = self.peers.get_mut(&m.get_to()).unwrap(); + let _ = p.step(m); + p.read_messages() + }; + new_msgs.append(&mut self.filter(resp)); + } + msgs.append(&mut new_msgs); + } + } + + /// Dispatches the given messages to the appropriate peers. + /// + /// Unlike `send` this does not gather and send any responses. It also does not ignore errors. + pub fn dispatch(&mut self, messages: impl IntoIterator) -> Result<()> { + for message in self.filter(messages.into_iter().map(Into::into)) { + let to = message.get_to(); + let peer = self.peers.get_mut(&to).unwrap(); + peer.step(message)?; + } + Ok(()) + } + + /// Ignore messages from `from` to `to` at `perc` percent chance. + /// + /// `perc` set to `1f64` is a 100% chance, `0f64` is a 0% chance. + pub fn drop(&mut self, from: u64, to: u64, perc: f64) { + self.dropm.insert(Connem { from, to }, perc); + } + + /// Cut the communication between the two given nodes. + pub fn cut(&mut self, one: u64, other: u64) { + self.drop(one, other, 1f64); + self.drop(other, one, 1f64); + } + + /// Isolate the given raft to and from all other raft in the cluster. + pub fn isolate(&mut self, id: u64) { + for i in 0..self.peers.len() as u64 { + let nid = i + 1; + if nid != id { + self.drop(id, nid, 1.0); + self.drop(nid, id, 1.0); + } + } + } + + /// Recover the cluster conditions applied with `drop` and `ignore`. + pub fn recover(&mut self) { + self.dropm = HashMap::new(); + self.ignorem = HashMap::new(); + } +} diff --git a/src/errors.rs b/src/errors.rs index 2e187872c..3d73dd26e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -146,7 +146,7 @@ pub type Result = result::Result; #[cfg(test)] mod tests { use super::*; - use setup_for_test; + use harness::setup_for_test; use std::io; #[test] diff --git a/src/lib.rs b/src/lib.rs index f664b9974..281844197 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -369,14 +369,14 @@ before taking old, removed peers offline. #[cfg(feature = "failpoint")] #[macro_use] extern crate fail; +#[cfg(test)] +extern crate harness; extern crate hashbrown; #[macro_use] extern crate log; extern crate protobuf; #[macro_use] extern crate quick_error; -#[cfg(test)] -extern crate env_logger; extern crate rand; #[macro_use] extern crate getset; @@ -441,9 +441,3 @@ pub mod prelude { pub use read_only::{ReadOnlyOption, ReadState}; } - -/// Do any common test initialization. Eg set up logging, setup fail-rs. -#[cfg(test)] -fn setup_for_test() { - let _ = env_logger::try_init(); -} diff --git a/src/log_unstable.rs b/src/log_unstable.rs index 87d1a1c44..e4b93ca15 100644 --- a/src/log_unstable.rs +++ b/src/log_unstable.rs @@ -184,8 +184,8 @@ impl Unstable { #[cfg(test)] mod test { use eraftpb::{Entry, Snapshot, SnapshotMetadata}; + use harness::setup_for_test; use log_unstable::Unstable; - use setup_for_test; fn new_entry(index: u64, term: u64) -> Entry { let mut e = Entry::new(); diff --git a/src/progress.rs b/src/progress.rs index b12beabaf..b2626fa48 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -958,8 +958,8 @@ impl Inflights { #[cfg(test)] mod test { + use harness::setup_for_test; use progress::Inflights; - use setup_for_test; #[test] fn test_inflight_add() { diff --git a/src/raft_log.rs b/src/raft_log.rs index b348f57a2..4d8942c9f 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -497,9 +497,9 @@ mod test { use eraftpb; use errors::{Error, StorageError}; + use harness::setup_for_test; use protobuf; use raft_log::{self, RaftLog}; - use setup_for_test; use storage::MemStorage; fn new_raft_log(s: MemStorage) -> RaftLog { diff --git a/src/raw_node.rs b/src/raw_node.rs index faef450c2..ed0f10132 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -532,7 +532,7 @@ impl RawNode { mod test { use super::is_local_msg; use eraftpb::MessageType; - use setup_for_test; + use harness::setup_for_test; #[test] fn test_is_local_msg() { diff --git a/src/storage.rs b/src/storage.rs index 67df549de..a54f7be98 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -374,11 +374,12 @@ impl Storage for MemStorage { #[cfg(test)] mod test { + extern crate harness; use eraftpb::{ConfState, Entry, Snapshot}; + use harness::setup_for_test; use protobuf; use errors::{Error as RaftError, StorageError}; - use setup_for_test; use storage::{MemStorage, Storage}; // TODO extract these duplicated utility functions for tests diff --git a/tests/failpoint_cases/mod.rs b/tests/failpoint_cases/mod.rs index e97a8de02..4b5f4f505 100644 --- a/tests/failpoint_cases/mod.rs +++ b/tests/failpoint_cases/mod.rs @@ -12,6 +12,7 @@ // limitations under the License. use fail; +use harness::setup_for_test; use raft::eraftpb::MessageType; use std::sync::*; use test_util::*; diff --git a/tests/integration_cases/test_membership_changes.rs b/tests/integration_cases/test_membership_changes.rs index d23ac17c3..c96e633f1 100644 --- a/tests/integration_cases/test_membership_changes.rs +++ b/tests/integration_cases/test_membership_changes.rs @@ -12,6 +12,7 @@ // limitations under the License. // // +use harness::{setup_for_test, Network}; use hashbrown::{HashMap, HashSet}; use protobuf::{self, RepeatedField}; use raft::{ @@ -22,7 +23,7 @@ use raft::{ Config, Configuration, Raft, Result, INVALID_ID, NO_LIMIT, }; use std::ops::{Deref, DerefMut}; -use test_util::{new_message, setup_for_test, Network}; +use test_util::new_message; // Test that the API itself works. // diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 4b31e91d6..1d58ec740 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -29,6 +29,7 @@ use std::cmp; use std::collections::HashMap; use std::panic::{self, AssertUnwindSafe}; +use harness::*; use hashbrown::HashSet; use protobuf::{self, RepeatedField}; use raft::eraftpb::{ diff --git a/tests/integration_cases/test_raft_flow_control.rs b/tests/integration_cases/test_raft_flow_control.rs index c85a79d99..8f161734b 100644 --- a/tests/integration_cases/test_raft_flow_control.rs +++ b/tests/integration_cases/test_raft_flow_control.rs @@ -25,6 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use harness::setup_for_test; use raft::eraftpb::*; use test_util::*; diff --git a/tests/integration_cases/test_raft_paper.rs b/tests/integration_cases/test_raft_paper.rs index fd309260b..9eecda728 100644 --- a/tests/integration_cases/test_raft_paper.rs +++ b/tests/integration_cases/test_raft_paper.rs @@ -25,6 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use harness::*; use protobuf::RepeatedField; use raft::eraftpb::*; use raft::storage::MemStorage; diff --git a/tests/integration_cases/test_raft_snap.rs b/tests/integration_cases/test_raft_snap.rs index 6b7a0aeb1..97924e0b1 100644 --- a/tests/integration_cases/test_raft_snap.rs +++ b/tests/integration_cases/test_raft_snap.rs @@ -25,6 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use harness::setup_for_test; use raft::eraftpb::*; use test_util::*; diff --git a/tests/integration_cases/test_raw_node.rs b/tests/integration_cases/test_raw_node.rs index abe8503c8..e253ccba8 100644 --- a/tests/integration_cases/test_raw_node.rs +++ b/tests/integration_cases/test_raw_node.rs @@ -25,6 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use harness::*; use protobuf::{self, ProtobufEnum}; use raft::eraftpb::*; use raft::storage::MemStorage; diff --git a/tests/test_util/mod.rs b/tests/test_util/mod.rs index 1e21cf66e..88f6b8a3b 100644 --- a/tests/test_util/mod.rs +++ b/tests/test_util/mod.rs @@ -25,19 +25,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use env_logger; +use harness::*; use protobuf::RepeatedField; use raft::eraftpb::*; use raft::storage::MemStorage; use raft::*; -use rand; -use std::collections::HashMap; -use std::ops::*; -/// Do any common test initialization. Eg set up logging, setup fail-rs. -pub fn setup_for_test() { - let _ = env_logger::try_init(); -} +#[allow(clippy::declare_interior_mutable_const)] +pub const NOP_STEPPER: Option = Some(Interface { raft: None }); pub fn ltoa(raft_log: &RaftLog) -> String { let mut s = format!("committed: {}\n", raft_log.committed); @@ -69,76 +64,6 @@ pub fn new_test_config( } } -/// Compare to upstream, we use struct instead of trait here. -/// Because to be able to cast Interface later, we have to make -/// Raft derive Any, which will require a lot of dependencies to derive Any. -/// That's not worthy for just testing purpose. -pub struct Interface { - pub raft: Option>, -} - -impl From> for Interface { - fn from(value: Raft) -> Self { - Interface::new(value) - } -} - -impl Interface { - pub fn new(r: Raft) -> Interface { - Interface { raft: Some(r) } - } - - pub fn step(&mut self, m: Message) -> Result<()> { - match self.raft { - Some(_) => Raft::step(self, m), - None => Ok(()), - } - } - - pub fn read_messages(&mut self) -> Vec { - match self.raft { - Some(_) => self.msgs.drain(..).collect(), - None => vec![], - } - } - - fn initial(&mut self, id: u64, ids: &[u64]) { - if self.raft.is_some() { - self.id = id; - let prs = self.take_prs(); - self.set_prs(ProgressSet::with_capacity( - ids.len(), - prs.learner_ids().len(), - )); - for id in ids { - let progress = Progress::new(0, 256); - if prs.learner_ids().contains(id) { - if let Err(e) = self.mut_prs().insert_learner(*id, progress) { - panic!("{}", e); - } - } else if let Err(e) = self.mut_prs().insert_voter(*id, progress) { - panic!("{}", e); - } - } - let term = self.term; - self.reset(term); - } - } -} - -impl Deref for Interface { - type Target = Raft; - fn deref(&self) -> &Raft { - self.raft.as_ref().unwrap() - } -} - -impl DerefMut for Interface { - fn deref_mut(&mut self) -> &mut Raft { - self.raft.as_mut().unwrap() - } -} - pub fn new_test_raft( id: u64, peers: Vec, @@ -221,153 +146,3 @@ pub fn new_snapshot(index: u64, term: u64, nodes: Vec) -> Snapshot { s.mut_metadata().mut_conf_state().set_nodes(nodes); s } - -#[derive(Default, Debug, PartialEq, Eq, Hash)] -struct Connem { - from: u64, - to: u64, -} - -#[allow(clippy::declare_interior_mutable_const)] -pub const NOP_STEPPER: Option = Some(Interface { raft: None }); - -#[derive(Default)] -pub struct Network { - pub peers: HashMap, - pub storage: HashMap, - dropm: HashMap, - ignorem: HashMap, -} - -impl Network { - // initializes a network from peers. - // A nil node will be replaced with a new *stateMachine. - // A *stateMachine will get its k, id. - // When using stateMachine, the address list is always [1, n]. - pub fn new(peers: Vec>) -> Network { - Network::new_with_config(peers, false) - } - - // new_with_config is like new but sets the configuration pre_vote explicitly - // for any state machines it creates. - pub fn new_with_config(mut peers: Vec>, pre_vote: bool) -> Network { - let size = peers.len(); - let peer_addrs: Vec = (1..=size as u64).collect(); - let mut nstorage = HashMap::new(); - let mut npeers = HashMap::new(); - for (p, id) in peers.drain(..).zip(peer_addrs.clone()) { - match p { - None => { - nstorage.insert(id, new_storage()); - let r = new_test_raft_with_prevote( - id, - peer_addrs.clone(), - 10, - 1, - nstorage[&id].clone(), - pre_vote, - ); - npeers.insert(id, r); - } - Some(mut p) => { - p.initial(id, &peer_addrs); - npeers.insert(id, p); - } - } - } - Network { - peers: npeers, - storage: nstorage, - ..Default::default() - } - } - - pub fn ignore(&mut self, t: MessageType) { - self.ignorem.insert(t, true); - } - - pub fn filter(&self, msgs: impl IntoIterator) -> Vec { - msgs.into_iter() - .filter(|m| { - if self - .ignorem - .get(&m.get_msg_type()) - .cloned() - .unwrap_or(false) - { - return false; - } - // hups never go over the network, so don't drop them but panic - assert_ne!(m.get_msg_type(), MessageType::MsgHup, "unexpected msgHup"); - let perc = self - .dropm - .get(&Connem { - from: m.get_from(), - to: m.get_to(), - }) - .cloned() - .unwrap_or(0f64); - rand::random::() >= perc - }) - .collect() - } - - pub fn read_messages(&mut self) -> Vec { - self.peers - .iter_mut() - .flat_map(|(_peer, progress)| progress.read_messages()) - .collect() - } - - pub fn send(&mut self, msgs: Vec) { - let mut msgs = msgs; - while !msgs.is_empty() { - let mut new_msgs = vec![]; - for m in msgs.drain(..) { - let resp = { - let p = self.peers.get_mut(&m.get_to()).unwrap(); - let _ = p.step(m); - p.read_messages() - }; - new_msgs.append(&mut self.filter(resp)); - } - msgs.append(&mut new_msgs); - } - } - - /// Dispatches the given messages to the appropriate peers. - /// - /// Unlike `send` this does not gather and send any responses. It also does not ignore errors. - pub fn dispatch(&mut self, messages: impl IntoIterator) -> Result<()> { - for message in self.filter(messages) { - let to = message.get_to(); - let peer = self.peers.get_mut(&to).unwrap(); - peer.step(message)?; - } - Ok(()) - } - - pub fn drop(&mut self, from: u64, to: u64, perc: f64) { - self.dropm.insert(Connem { from, to }, perc); - } - - pub fn cut(&mut self, one: u64, other: u64) { - self.drop(one, other, 1f64); - self.drop(other, one, 1f64); - } - - pub fn isolate(&mut self, id: u64) { - for i in 0..self.peers.len() as u64 { - let nid = i + 1; - if nid != id { - self.drop(id, nid, 1.0); - self.drop(nid, id, 1.0); - } - } - } - - pub fn recover(&mut self) { - self.dropm = HashMap::new(); - self.ignorem = HashMap::new(); - } -} diff --git a/tests/tests.rs b/tests/tests.rs index 969aa711f..4fc218e8e 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -16,7 +16,6 @@ #[macro_use] extern crate log; -extern crate env_logger; extern crate protobuf; extern crate raft; extern crate rand; @@ -25,6 +24,7 @@ extern crate rand; extern crate lazy_static; #[cfg(feature = "failpoint")] extern crate fail; +extern crate harness; extern crate hashbrown; /// Get the count of macro's arguments.