diff --git a/harness/tests/integration_cases/test_raft.rs b/harness/tests/integration_cases/test_raft.rs index ef91274e1..1257436b7 100644 --- a/harness/tests/integration_cases/test_raft.rs +++ b/harness/tests/integration_cases/test_raft.rs @@ -2394,7 +2394,7 @@ fn test_read_only_for_new_leader() { nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); assert_eq!(nt.peers[&1].state, StateRole::Leader); - // Ensure peer 1 drops read only request. + // Ensure peer 1 can't response read only requests immediately. let windex = 5; let wctx = "ctx"; nt.send(vec![new_message_with_entries( @@ -2407,11 +2407,14 @@ fn test_read_only_for_new_leader() { nt.recover(); - // Force peer 1 to commit a log entry at its term. + // Force peer 1 to commit a log entry at its term, then old read requests will be responsed. for _ in 0..heartbeat_ticks { nt.peers.get_mut(&1).unwrap().tick(); } nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + assert_eq!(nt.peers[&1].read_states.len(), 1); + nt.peers.get_mut(&1).unwrap().read_states.clear(); + assert_eq!(nt.peers[&1].raft_log.committed, 5); assert_eq!( nt.peers[&1] diff --git a/src/raft.rs b/src/raft.rs index 6d972ec77..2d67c4b22 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -628,7 +628,15 @@ impl Raft { /// changed (in which case the caller should call `r.bcast_append`). pub fn maybe_commit(&mut self) -> bool { let mci = self.prs().maximal_committed_index(); - self.raft_log.maybe_commit(mci, self.term) + if self.raft_log.maybe_commit(mci, self.term) { + for rs in self.read_only.advance_by_commit(self.raft_log.committed) { + if let Some(m) = self.response_ready_read(rs.req, rs.index) { + self.send(m); + } + } + return true; + } + false } /// Commit that the Raft peer has applied up to the given index. @@ -1307,23 +1315,10 @@ impl Raft { return; } - let rss = self.read_only.advance(m, &self.logger); - for rs in rss { - let mut req = rs.req; - if req.from == INVALID_ID || req.from == self.id { - // from local member - let rs = ReadState { - index: rs.index, - request_ctx: req.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = req.from; - to_send.index = rs.index; - to_send.set_entries(req.take_entries()); - more_to_send.push(to_send); + let ready = self.ready_to_read_index(); + for rs in self.read_only.advance(m, ready) { + if let Some(m) = self.response_ready_read(rs.req, rs.index) { + more_to_send.push(m); } } } @@ -1530,62 +1525,34 @@ impl Raft { return Ok(()); } MessageType::MsgReadIndex => { - if self.raft_log.term(self.raft_log.committed).unwrap_or(0) != self.term { - // Reject read only request when this leader has not committed any log entry - // in its term. + let mut self_set = HashSet::default(); + self_set.insert(self.id); + if self.prs().has_quorum(&self_set) { + // There is only one voting member (the leader) in the cluster. + debug_assert!(self.ready_to_read_index()); + if let Some(m) = self.response_ready_read(m, self.raft_log.committed) { + self.send(m); + } return Ok(()); } - let mut self_set = HashSet::default(); - self_set.insert(self.id); - if !self.prs().has_quorum(&self_set) { + match self.read_only.option { // thinking: use an interally defined context instead of the user given context. // We can express this in terms of the term and index instead of // a user-supplied value. // This would allow multiple reads to piggyback on the same message. - match self.read_only.option { - ReadOnlyOption::Safe => { - let ctx = m.entries[0].data.to_vec(); - self.read_only.add_request(self.raft_log.committed, m); - self.bcast_heartbeat_with_ctx(Some(ctx)); - } - ReadOnlyOption::LeaseBased => { - let read_index = self.raft_log.committed; - if m.from == INVALID_ID || m.from == self.id { - // from local member - let rs = ReadState { - index: read_index, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = m.from; - to_send.index = read_index; - to_send.set_entries(m.take_entries()); - self.send(to_send); - } - } + ReadOnlyOption::Safe => { + let ctx = m.entries[0].data.to_vec(); + self.read_only.add_request(self.raft_log.committed, m); + self.bcast_heartbeat_with_ctx(Some(ctx)); } - } else { - // there is only one voting member (the leader) in the cluster - if m.from == INVALID_ID || m.from == self.id { - // from leader itself - let rs = ReadState { - index: self.raft_log.committed, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); - } else { - // from learner member - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.to = m.from; - to_send.index = self.raft_log.committed; - to_send.set_entries(m.take_entries()); - self.send(to_send); + ReadOnlyOption::LeaseBased if self.ready_to_read_index() => { + if let Some(m) = self.response_ready_read(m, self.raft_log.committed) { + self.send(m); + } } + // ReadIndex request is dropped silently. + ReadOnlyOption::LeaseBased => {} } return Ok(()); } @@ -2204,6 +2171,29 @@ impl Raft { self.lead_transferee = None; } + /// Test the peer is ready to response `ReadIndex` requests or not. + pub fn ready_to_read_index(&self) -> bool { + let read_index = self.raft_log.committed; + self.raft_log.term(read_index).unwrap_or(0) == self.term + } + + fn response_ready_read(&mut self, mut req: Message, index: u64) -> Option { + if req.from == INVALID_ID || req.from == self.id { + let rs = ReadState { + index, + request_ctx: req.take_entries()[0].take_data(), + }; + self.read_states.push(rs); + return None; + } + let mut to_send = Message::default(); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.to = req.from; + to_send.index = index; + to_send.set_entries(req.take_entries()); + Some(to_send) + } + fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/read_only.rs b/src/read_only.rs index cc3e330da..a3859599c 100644 --- a/src/read_only.rs +++ b/src/read_only.rs @@ -16,8 +16,6 @@ use std::collections::VecDeque; -use slog::Logger; - use crate::eraftpb::Message; use crate::{HashMap, HashSet}; @@ -66,6 +64,9 @@ pub struct ReadOnly { pub option: ReadOnlyOption, pub pending_read_index: HashMap, ReadIndexStatus>, pub read_index_queue: VecDeque>, + // Items in `read_index_queue` with index *less* than `waiting_for_ready` + // are pending because the peer hasn't committed to its term. + waiting_for_ready: usize, } impl ReadOnly { @@ -74,6 +75,7 @@ impl ReadOnly { option, pending_read_index: HashMap::default(), read_index_queue: VecDeque::new(), + waiting_for_ready: 0, } } @@ -119,14 +121,16 @@ impl ReadOnly { /// Advances the read only request queue kept by the ReadOnly struct. /// It dequeues the requests until it finds the read only request that has /// the same context as the given `m`. - pub fn advance(&mut self, m: &Message, logger: &Logger) -> Vec { + pub fn advance(&mut self, m: &Message, ready: bool) -> Vec { let mut rss = vec![]; if let Some(i) = self.read_index_queue.iter().position(|x| { - if !self.pending_read_index.contains_key(x) { - fatal!(logger, "cannot find correspond read state from pending map"); - } + debug_assert!(self.pending_read_index.contains_key(x)); *x == m.context }) { + if !ready { + self.waiting_for_ready = std::cmp::max(self.waiting_for_ready, i + 1); + return rss; + } for _ in 0..=i { let rs = self.read_index_queue.pop_front().unwrap(); let status = self.pending_read_index.remove(&rs).unwrap(); @@ -136,6 +140,21 @@ impl ReadOnly { rss } + pub(crate) fn advance_by_commit(&mut self, committed: u64) -> Vec { + let mut rss = vec![]; + if self.waiting_for_ready > 0 { + let remained = self.read_index_queue.split_off(self.waiting_for_ready); + self.waiting_for_ready = 0; + for rs in std::mem::replace(&mut self.read_index_queue, remained) { + let mut status = self.pending_read_index.remove(&rs).unwrap(); + // Use latest committed index to avoid stale read on follower peers. + status.index = committed; + rss.push(status); + } + } + rss + } + /// Returns the context of the last pending read only request in ReadOnly struct. pub fn last_pending_request_ctx(&self) -> Option> { self.read_index_queue.back().cloned()