From 26a62d3754f77d21dca40d1a5bca0642316d140b Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 31 Dec 2019 15:21:59 +0800 Subject: [PATCH 1/2] dont drop read index requests when not ready In the current implementation leader will drop read index requests when it hasn't committed to its term. This could introduce high latency during leader transferring for follower read. Signed-off-by: qupeng --- Cargo.toml | 2 +- harness/Cargo.toml | 2 +- src/raft.rs | 116 ++++++++++++++++++++------------------------- src/read_only.rs | 31 +++++++++--- 4 files changed, 78 insertions(+), 73 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8cd2208bf..f3735f487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ quick-error = "1.2.2" raft-proto = { path = "proto", version = "0.6.0-alpha", default-features = false } # Stick with 0.6 (rather than 0.7) for now, since most of the ecosystem has not # upgraded yet and this saves duplicate deps. -rand = "0.6.5" +rand = "0.7" fxhash = "0.2.1" fail = { version = "0.3", optional = true } getset = "0.0.7" diff --git a/harness/Cargo.toml b/harness/Cargo.toml index f8ab32cbc..95176126f 100644 --- a/harness/Cargo.toml +++ b/harness/Cargo.toml @@ -19,7 +19,7 @@ prost-codec = ["raft/prost-codec"] # Make sure to synchronize updates with Raft. [dependencies] raft = { path = "..", default-features = false } -rand = "0.6.5" +rand = "0.7" slog = "2.2" [dev-dependencies] diff --git a/src/raft.rs b/src/raft.rs index 6d972ec77..422e0bcf1 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -628,7 +628,11 @@ impl Raft { /// changed (in which case the caller should call `r.bcast_append`). pub fn maybe_commit(&mut self) -> bool { let mci = self.prs().maximal_committed_index(); - self.raft_log.maybe_commit(mci, self.term) + if self.raft_log.maybe_commit(mci, self.term) { + self.read_only.advance_by_commit(self.raft_log.committed); + return true; + } + false } /// Commit that the Raft peer has applied up to the given index. @@ -1307,23 +1311,10 @@ impl Raft { return; } - let rss = self.read_only.advance(m, &self.logger); - for rs in rss { - let mut req = rs.req; - if req.from == INVALID_ID || req.from == self.id { - // from local member - let rs = ReadState { - index: rs.index, - request_ctx: req.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = req.from; - to_send.index = rs.index; - to_send.set_entries(req.take_entries()); - more_to_send.push(to_send); + let ready = self.ready_to_read_index(); + for rs in self.read_only.advance(m, ready) { + if let Some(m) = self.response_ready_read(rs.req, rs.index) { + more_to_send.push(m); } } } @@ -1530,62 +1521,34 @@ impl Raft { return Ok(()); } MessageType::MsgReadIndex => { - if self.raft_log.term(self.raft_log.committed).unwrap_or(0) != self.term { - // Reject read only request when this leader has not committed any log entry - // in its term. + let mut self_set = HashSet::default(); + self_set.insert(self.id); + if self.prs().has_quorum(&self_set) { + // There is only one voting member (the leader) in the cluster. + debug_assert!(self.ready_to_read_index()); + if let Some(m) = self.response_ready_read(m, self.raft_log.committed) { + self.send(m); + } return Ok(()); } - let mut self_set = HashSet::default(); - self_set.insert(self.id); - if !self.prs().has_quorum(&self_set) { + match self.read_only.option { // thinking: use an interally defined context instead of the user given context. // We can express this in terms of the term and index instead of // a user-supplied value. // This would allow multiple reads to piggyback on the same message. - match self.read_only.option { - ReadOnlyOption::Safe => { - let ctx = m.entries[0].data.to_vec(); - self.read_only.add_request(self.raft_log.committed, m); - self.bcast_heartbeat_with_ctx(Some(ctx)); - } - ReadOnlyOption::LeaseBased => { - let read_index = self.raft_log.committed; - if m.from == INVALID_ID || m.from == self.id { - // from local member - let rs = ReadState { - index: read_index, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = m.from; - to_send.index = read_index; - to_send.set_entries(m.take_entries()); - self.send(to_send); - } - } + ReadOnlyOption::Safe => { + let ctx = m.entries[0].data.to_vec(); + self.read_only.add_request(self.raft_log.committed, m); + self.bcast_heartbeat_with_ctx(Some(ctx)); } - } else { - // there is only one voting member (the leader) in the cluster - if m.from == INVALID_ID || m.from == self.id { - // from leader itself - let rs = ReadState { - index: self.raft_log.committed, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - // from learner member - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = m.from; - to_send.index = self.raft_log.committed; - to_send.set_entries(m.take_entries()); - self.send(to_send); + ReadOnlyOption::LeaseBased if self.ready_to_read_index() => { + if let Some(m) = self.response_ready_read(m, self.raft_log.committed) { + self.send(m); + } } + // ReadIndex request is dropped silently. + ReadOnlyOption::LeaseBased => {} } return Ok(()); } @@ -2204,6 +2167,29 @@ impl Raft { self.lead_transferee = None; } + /// Test the peer is ready to response `ReadIndex` requests or not. + pub fn ready_to_read_index(&self) -> bool { + let read_index = self.raft_log.committed; + self.raft_log.term(read_index).unwrap_or(0) == self.term + } + + fn response_ready_read(&mut self, mut req: Message, index: u64) -> Option { + if req.from == INVALID_ID || req.from == self.id { + let rs = ReadState { + index, + request_ctx: req.take_entries()[0].take_data(), + }; + self.read_states.push(rs); + return None; + } + let mut to_send = Message::default(); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.to = req.from; + to_send.index = index; + to_send.set_entries(req.take_entries()); + Some(to_send) + } + fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/read_only.rs b/src/read_only.rs index cc3e330da..a3859599c 100644 --- a/src/read_only.rs +++ b/src/read_only.rs @@ -16,8 +16,6 @@ use std::collections::VecDeque; -use slog::Logger; - use crate::eraftpb::Message; use crate::{HashMap, HashSet}; @@ -66,6 +64,9 @@ pub struct ReadOnly { pub option: ReadOnlyOption, pub pending_read_index: HashMap, ReadIndexStatus>, pub read_index_queue: VecDeque>, + // Items in `read_index_queue` with index *less* than `waiting_for_ready` + // are pending because the peer hasn't committed to its term. + waiting_for_ready: usize, } impl ReadOnly { @@ -74,6 +75,7 @@ impl ReadOnly { option, pending_read_index: HashMap::default(), read_index_queue: VecDeque::new(), + waiting_for_ready: 0, } } @@ -119,14 +121,16 @@ impl ReadOnly { /// Advances the read only request queue kept by the ReadOnly struct. /// It dequeues the requests until it finds the read only request that has /// the same context as the given `m`. - pub fn advance(&mut self, m: &Message, logger: &Logger) -> Vec { + pub fn advance(&mut self, m: &Message, ready: bool) -> Vec { let mut rss = vec![]; if let Some(i) = self.read_index_queue.iter().position(|x| { - if !self.pending_read_index.contains_key(x) { - fatal!(logger, "cannot find correspond read state from pending map"); - } + debug_assert!(self.pending_read_index.contains_key(x)); *x == m.context }) { + if !ready { + self.waiting_for_ready = std::cmp::max(self.waiting_for_ready, i + 1); + return rss; + } for _ in 0..=i { let rs = self.read_index_queue.pop_front().unwrap(); let status = self.pending_read_index.remove(&rs).unwrap(); @@ -136,6 +140,21 @@ impl ReadOnly { rss } + pub(crate) fn advance_by_commit(&mut self, committed: u64) -> Vec { + let mut rss = vec![]; + if self.waiting_for_ready > 0 { + let remained = self.read_index_queue.split_off(self.waiting_for_ready); + self.waiting_for_ready = 0; + for rs in std::mem::replace(&mut self.read_index_queue, remained) { + let mut status = self.pending_read_index.remove(&rs).unwrap(); + // Use latest committed index to avoid stale read on follower peers. + status.index = committed; + rss.push(status); + } + } + rss + } + /// Returns the context of the last pending read only request in ReadOnly struct. pub fn last_pending_request_ctx(&self) -> Option> { self.read_index_queue.back().cloned() From 21734cf3ab36c2198f1f6492f238dea883834ab8 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 31 Dec 2019 15:53:21 +0800 Subject: [PATCH 2/2] add a test case Signed-off-by: qupeng --- harness/tests/integration_cases/test_raft.rs | 7 +++++-- src/raft.rs | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index 6d02fdea2..6ad741c03 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -2394,7 +2394,7 @@ fn test_read_only_for_new_leader() { nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); assert_eq!(nt.peers[&1].state, StateRole::Leader); - // Ensure peer 1 drops read only request. + // Ensure peer 1 can't response read only requests immediately. let windex = 5; let wctx = "ctx"; nt.send(vec![new_message_with_entries( @@ -2407,11 +2407,14 @@ fn test_read_only_for_new_leader() { nt.recover(); - // Force peer 1 to commit a log entry at its term. + // Force peer 1 to commit a log entry at its term, then old read requests will be responsed. for _ in 0..heartbeat_ticks { nt.peers.get_mut(&1).unwrap().tick(); } nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + assert_eq!(nt.peers[&1].read_states.len(), 1); + nt.peers.get_mut(&1).unwrap().read_states.clear(); + assert_eq!(nt.peers[&1].raft_log.committed, 5); assert_eq!( nt.peers[&1] diff --git a/src/raft.rs b/src/raft.rs index 422e0bcf1..2d67c4b22 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -629,7 +629,11 @@ impl Raft { pub fn maybe_commit(&mut self) -> bool { let mci = self.prs().maximal_committed_index(); if self.raft_log.maybe_commit(mci, self.term) { - self.read_only.advance_by_commit(self.raft_log.committed); + for rs in self.read_only.advance_by_commit(self.raft_log.committed) { + if let Some(m) = self.response_ready_read(rs.req, rs.index) { + self.send(m); + } + } return true; } false