Skip to content

Commit

Permalink
Bump to v0.4.3 (#231)
Browse files Browse the repository at this point in the history
* raft: leader respond to learner read index message (#220)

Signed-off-by: nolouch <[email protected]>

* Bump to v0.4.3

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored May 8, 2019
1 parent 3b41216 commit e501361
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.4.3 - 2019-05-08

- Leader responds to learner read index message. (https://github.com/pingcap/raft-rs/pull/220)

# 0.4.2 - 2019-04-29

- Fix potential two leaders at the same term when transferring leader. (https://github.com/pingcap/raft-rs/pull/225)
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "raft"
version = "0.4.2"
version = "0.4.3"
authors = ["The TiKV Project Developers"]
license = "Apache-2.0"
keywords = ["raft", "distributed-systems", "ha"]
Expand Down
1 change: 0 additions & 1 deletion benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ mod suites;
pub const DEFAULT_RAFT_SETS: [(usize, usize); 4] = [(0, 0), (3, 1), (5, 2), (7, 3)];

fn main() {
criterion::init_logging();
let mut c = Criterion::default()
// Configure defaults before overriding with args.
.warm_up_time(Duration::from_millis(500))
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ For more information, check out an [example](examples/single_mem_node/main.rs#L1
*/

#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))]
#![cfg_attr(feature = "cargo-clippy", feature(tool_lints))]
#![deny(missing_docs)]

#[cfg(feature = "failpoint")]
Expand Down
30 changes: 21 additions & 9 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ impl<T: Storage> Raft<T> {

self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
self.step(m).is_ok();
let _ = self.step(m);
true
}

Expand All @@ -695,7 +695,7 @@ impl<T: Storage> Raft<T> {
if self.check_quorum {
let m = new_message(INVALID_ID, MessageType::MsgCheckQuorum, Some(self.id));
has_ready = true;
self.step(m).is_ok();
let _ = self.step(m);
}
if self.state == StateRole::Leader && self.lead_transferee.is_some() {
self.abort_leader_transfer()
Expand All @@ -710,7 +710,7 @@ impl<T: Storage> Raft<T> {
self.heartbeat_elapsed = 0;
has_ready = true;
let m = new_message(INVALID_ID, MessageType::MsgBeat, Some(self.id));
self.step(m).is_ok();
let _ = self.step(m);
}
has_ready
}
Expand Down Expand Up @@ -794,7 +794,7 @@ impl<T: Storage> Raft<T> {
}

fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.into_iter()
ents.iter()
.filter(|e| e.get_entry_type() == EntryType::EntryConfChange)
.count()
}
Expand Down Expand Up @@ -1486,11 +1486,23 @@ impl<T: Storage> Raft<T> {
}
}
} else {
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
// there is only one voting member (the leader) in the cluster
if m.get_from() == INVALID_ID || m.get_from() == self.id {
// from leader itself
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
// from learner member
let mut to_send = Message::default();
to_send.set_to(m.get_from());
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.set_index(self.raft_log.committed);
to_send.set_entries(m.take_entries());
self.send(to_send);
}
}
return Ok(());
}
Expand Down
8 changes: 4 additions & 4 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl<T: Storage> RawNode<T> {
m.set_msg_type(MessageType::MsgUnreachable);
m.set_from(id);
// we don't care if it is ok actually
self.raft.step(m).is_ok();
let _ = self.raft.step(m);
}

/// ReportSnapshot reports the status of the sent snapshot.
Expand All @@ -454,15 +454,15 @@ impl<T: Storage> RawNode<T> {
m.set_from(id);
m.set_reject(rej);
// we don't care if it is ok actually
self.raft.step(m).is_ok();
let _ = self.raft.step(m);
}

/// TransferLeader tries to transfer leadership to the given transferee.
pub fn transfer_leader(&mut self, transferee: u64) {
let mut m = Message::new();
m.set_msg_type(MessageType::MsgTransferLeader);
m.set_from(transferee);
self.raft.step(m).is_ok();
let _ = self.raft.step(m);
}

/// ReadIndex requests a read state. The read state will be set in ready.
Expand All @@ -475,7 +475,7 @@ impl<T: Storage> RawNode<T> {
let mut e = Entry::new();
e.set_data(rctx);
m.set_entries(RepeatedField::from_vec(vec![e]));
self.raft.step(m).is_ok();
let _ = self.raft.step(m);
}

/// Returns the store as an immutable reference.
Expand Down
73 changes: 73 additions & 0 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,79 @@ fn test_read_only_option_safe() {
}
}

#[test]
fn test_read_only_with_learner() {
setup_for_test();
let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage());
let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage());

let mut nt = Network::new(vec![Some(a), Some(b)]);

// we can not let system choose the value of randomizedElectionTimeout
// otherwise it will introduce some uncertainty into this test case
// we need to ensure randomizedElectionTimeout > electionTimeout here
let b_election_timeout = nt.peers[&2].get_election_timeout();
nt.peers
.get_mut(&2)
.unwrap()
.set_randomized_election_timeout(b_election_timeout + 1);

for _ in 0..b_election_timeout {
nt.peers.get_mut(&2).unwrap().tick();
}
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);

let mut tests = vec![
(1, 10, 11, "ctx1"),
(2, 10, 21, "ctx2"),
(1, 10, 31, "ctx3"),
(2, 10, 41, "ctx4"),
];

for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() {
for _ in 0..proposals {
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
}

let e = new_entry(0, 0, Some(wctx));
nt.send(vec![new_message_with_entries(
id,
id,
MessageType::MsgReadIndex,
vec![e],
)]);

let read_states: Vec<ReadState> = nt
.peers
.get_mut(&id)
.unwrap()
.read_states
.drain(..)
.collect();
assert_eq!(
read_states.is_empty(),
false,
"#{}: read_states is empty, want non-empty",
i
);
let rs = &read_states[0];
assert_eq!(
rs.index, wri,
"#{}: read_index = {}, want {}",
i, rs.index, wri
);
let vec_wctx = wctx.as_bytes().to_vec();
assert_eq!(
rs.request_ctx, vec_wctx,
"#{}: request_ctx = {:?}, want {:?}",
i, rs.request_ctx, vec_wctx
);
}
}

#[test]
fn test_read_only_option_lease() {
setup_for_test();
Expand Down

0 comments on commit e501361

Please sign in to comment.