Skip to content

Commit

Permalink
Extract Network and Interface from utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoverbear committed Sep 22, 2018
1 parent 32040f2 commit 8ac3808
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 211 deletions.
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,25 @@ description = "The rust language implementation of Raft algorithm."
categories = ["algorithms", "database-implementations"]

[features]
default = []
default = ["harness"]
# Enable failpoints
failpoint = ["fail"]
# Enable the testing harness.
# This is required to run `cargo test` or `cargo bench`.
harness = ["env_logger"]

[dependencies]
log = ">0.2"
protobuf = "2.0.4"
quick-error = "1.2.2"
rand = "0.5.4"
fxhash = "0.2.1"
# Rust does not offer a common feature flag for tests and benches.
# So we have these optional behind features.
fail = { version = "0.2", optional = true }
env_logger = { version = "0.5", optional = true }

[dev-dependencies]
env_logger = "0.5"
criterion = ">0.2.4"
lazy_static = "1.0"

Expand Down
120 changes: 120 additions & 0 deletions src/harness/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Deref, DerefMut};
use {eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result};

/// A simulated Raft façade for testing.
///
/// If the contained value is a `Some` operations happen. If they are a `None` operations are
/// a no-op.
///
// Compare to upstream, we use struct instead of trait here.
// Because to be able to cast Interface later, we have to make
// Raft derive Any, which will require a lot of dependencies to derive Any.
// That's not worthy for just testing purpose.
pub struct Interface {
/// The raft peer.
pub raft: Option<Raft<MemStorage>>,
}

impl Interface {
/// Create a new interface to a new raft.
pub fn new(r: Raft<MemStorage>) -> Interface {
Interface { raft: Some(r) }
}

/// Step the raft, if it exists.
pub fn step(&mut self, m: Message) -> Result<()> {
match self.raft {
Some(_) => Raft::step(self, m),
None => Ok(()),
}
}

/// Read messages out of the raft.
pub fn read_messages(&mut self) -> Vec<Message> {
match self.raft {
Some(_) => self.msgs.drain(..).collect(),
None => vec![],
}
}

/// Initialize a raft with the given ID and peer set.
pub fn initial(&mut self, id: u64, ids: &[u64]) {
if self.raft.is_some() {
self.id = id;
let prs = self.take_prs();
self.set_prs(ProgressSet::new(ids.len(), prs.learners().len()));
for id in ids {
if prs.learners().contains_key(id) {
let progress = Progress {
is_learner: true,
..Default::default()
};
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else {
let progress = Progress {
..Default::default()
};
if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
}
let term = self.term;
self.reset(term);
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
fn from(raft: Option<Raft<MemStorage>>) -> Self {
Self { raft }
}
}

impl From<Raft<MemStorage>> for Interface {
fn from(raft: Raft<MemStorage>) -> Self {
Self { raft: Some(raft) }
}
}

impl Deref for Interface {
type Target = Raft<MemStorage>;
fn deref(&self) -> &Raft<MemStorage> {
self.raft.as_ref().unwrap()
}
}

impl DerefMut for Interface {
fn deref_mut(&mut self) -> &mut Raft<MemStorage> {
self.raft.as_mut().unwrap()
}
}
39 changes: 39 additions & 0 deletions src/harness/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*!
This module contains various testing harness utilities for Raft.
> If you want to build Raft without this, disable the `harness` feature.
*/

mod interface;
mod network;

pub use self::{interface::Interface, network::Network};
184 changes: 184 additions & 0 deletions src/harness/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::interface::Interface;
use rand;
use std::collections::HashMap;
use {
eraftpb::{Message, MessageType},
storage::MemStorage,
Config, Raft, NO_LIMIT,
};

#[derive(Default, Debug, PartialEq, Eq, Hash)]
struct Connem {
from: u64,
to: u64,
}

/// A simulated network for testing.
///
/// You can use this to create a test network of Raft nodes.
///
/// *Please note:* no actual network calls are made.
#[derive(Default)]
pub struct Network {
/// The set of raft peers.
pub peers: HashMap<u64, Interface>,
/// The storage of the raft peers.
pub storage: HashMap<u64, MemStorage>,
dropm: HashMap<Connem, f64>,
ignorem: HashMap<MessageType, bool>,
}

impl Network {
/// Initializes a network from peers.
///
/// Nodes will recieve their ID based on their index in the vector, starting with 1.
///
/// A `None` node will be replaced with a new Raft node.
pub fn new(peers: Vec<Option<Interface>>) -> Network {
Network::new_with_config(peers, false)
}

/// Explicitly set the pre_vote option on newly created rafts.
///
/// **TODO:** Make this accept any config.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, pre_vote: bool) -> Network {
let size = peers.len();
let peer_addrs: Vec<u64> = (1..size as u64 + 1).collect();
let mut nstorage = HashMap::new();
let mut npeers = HashMap::new();
for (p, id) in peers.drain(..).zip(peer_addrs.clone()) {
match p {
None => {
nstorage.insert(id, MemStorage::default());
let r = Interface::new(Raft::new(
&Config {
id,
peers: peer_addrs.clone(),
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
pre_vote,
..Default::default()
},
nstorage[&id].clone(),
));
npeers.insert(id, r);
}
Some(mut p) => {
p.initial(id, &peer_addrs);
npeers.insert(id, p);
}
}
}
Network {
peers: npeers,
storage: nstorage,
..Default::default()
}
}

/// Ignore a given `MessageType`.
pub fn ignore(&mut self, t: MessageType) {
self.ignorem.insert(t, true);
}

/// Filter out messages that should be dropped according to rules set by `ignore` or `drop`.
pub fn filter(&self, mut msgs: Vec<Message>) -> Vec<Message> {
msgs.drain(..)
.filter(|m| {
if self
.ignorem
.get(&m.get_msg_type())
.cloned()
.unwrap_or(false)
{
return false;
}
// hups never go over the network, so don't drop them but panic
assert_ne!(m.get_msg_type(), MessageType::MsgHup, "unexpected msgHup");
let perc = self
.dropm
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
}).cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
}).collect()
}

/// Instruct the cluster to `step` through the given messages.
pub fn send(&mut self, msgs: Vec<Message>) {
let mut msgs = msgs;
while !msgs.is_empty() {
let mut new_msgs = vec![];
for m in msgs.drain(..) {
let resp = {
let p = self.peers.get_mut(&m.get_to()).unwrap();
let _ = p.step(m);
p.read_messages()
};
new_msgs.append(&mut self.filter(resp));
}
msgs.append(&mut new_msgs);
}
}

/// Ignore messages from `from` to `to` at `perc` percent chance.
///
/// `perc` set to `1f64` is a 100% chance, `0f64` is a 0% chance.
pub fn drop(&mut self, from: u64, to: u64, perc: f64) {
self.dropm.insert(Connem { from, to }, perc);
}

/// Cut the communication between the two given nodes.
pub fn cut(&mut self, one: u64, other: u64) {
self.drop(one, other, 1f64);
self.drop(other, one, 1f64);
}

/// Isolate the given raft to and from all other raft in the cluster.
pub fn isolate(&mut self, id: u64) {
for i in 0..self.peers.len() as u64 {
let nid = i + 1;
if nid != id {
self.drop(id, nid, 1.0);
self.drop(nid, id, 1.0);
}
}
}

/// Recover the cluster conditions applied with `drop` and `ignore`.
pub fn recover(&mut self) {
self.dropm = HashMap::new();
self.ignorem = HashMap::new();
}
}
Loading

0 comments on commit 8ac3808

Please sign in to comment.