Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dont drop read index requests when not ready #332

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@ fn test_read_only_for_new_leader() {
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);

// Ensure peer 1 drops read only request.
// Ensure peer 1 can't response read only requests immediately.
let windex = 5;
let wctx = "ctx";
nt.send(vec![new_message_with_entries(
Expand All @@ -2407,11 +2407,14 @@ fn test_read_only_for_new_leader() {

nt.recover();

// Force peer 1 to commit a log entry at its term.
// Force peer 1 to commit a log entry at its term, then old read requests will be responsed.
for _ in 0..heartbeat_ticks {
nt.peers.get_mut(&1).unwrap().tick();
}
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
assert_eq!(nt.peers[&1].read_states.len(), 1);
nt.peers.get_mut(&1).unwrap().read_states.clear();

assert_eq!(nt.peers[&1].raft_log.committed, 5);
assert_eq!(
nt.peers[&1]
Expand Down
120 changes: 55 additions & 65 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,15 @@ impl<T: Storage> Raft<T> {
/// changed (in which case the caller should call `r.bcast_append`).
pub fn maybe_commit(&mut self) -> bool {
let mci = self.prs().maximal_committed_index();
self.raft_log.maybe_commit(mci, self.term)
if self.raft_log.maybe_commit(mci, self.term) {
for rs in self.read_only.advance_by_commit(self.raft_log.committed) {
if let Some(m) = self.response_ready_read(rs.req, rs.index) {
self.send(m);
}
}
return true;
}
false
}

/// Commit that the Raft peer has applied up to the given index.
Expand Down Expand Up @@ -1307,23 +1315,10 @@ impl<T: Storage> Raft<T> {
return;
}

let rss = self.read_only.advance(m, &self.logger);
for rs in rss {
let mut req = rs.req;
if req.from == INVALID_ID || req.from == self.id {
// from local member
let rs = ReadState {
index: rs.index,
request_ctx: req.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = req.from;
to_send.index = rs.index;
to_send.set_entries(req.take_entries());
more_to_send.push(to_send);
let ready = self.ready_to_read_index();
for rs in self.read_only.advance(m, ready) {
if let Some(m) = self.response_ready_read(rs.req, rs.index) {
more_to_send.push(m);
}
}
}
Expand Down Expand Up @@ -1530,62 +1525,34 @@ impl<T: Storage> Raft<T> {
return Ok(());
}
MessageType::MsgReadIndex => {
if self.raft_log.term(self.raft_log.committed).unwrap_or(0) != self.term {
// Reject read only request when this leader has not committed any log entry
// in its term.
let mut self_set = HashSet::default();
self_set.insert(self.id);
if self.prs().has_quorum(&self_set) {
// There is only one voting member (the leader) in the cluster.
debug_assert!(self.ready_to_read_index());
if let Some(m) = self.response_ready_read(m, self.raft_log.committed) {
self.send(m);
}
return Ok(());
}

let mut self_set = HashSet::default();
self_set.insert(self.id);
if !self.prs().has_quorum(&self_set) {
match self.read_only.option {
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of
// a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
match self.read_only.option {
ReadOnlyOption::Safe => {
let ctx = m.entries[0].data.to_vec();
self.read_only.add_request(self.raft_log.committed, m);
self.bcast_heartbeat_with_ctx(Some(ctx));
}
ReadOnlyOption::LeaseBased => {
let read_index = self.raft_log.committed;
if m.from == INVALID_ID || m.from == self.id {
// from local member
let rs = ReadState {
index: read_index,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = m.from;
to_send.index = read_index;
to_send.set_entries(m.take_entries());
self.send(to_send);
}
}
ReadOnlyOption::Safe => {
let ctx = m.entries[0].data.to_vec();
self.read_only.add_request(self.raft_log.committed, m);
self.bcast_heartbeat_with_ctx(Some(ctx));
}
} else {
// there is only one voting member (the leader) in the cluster
if m.from == INVALID_ID || m.from == self.id {
// from leader itself
let rs = ReadState {
index: self.raft_log.committed,
request_ctx: m.take_entries()[0].take_data(),
};
self.read_states.push(rs);
} else {
// from learner member
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
to_send.set_entries(m.take_entries());
self.send(to_send);
ReadOnlyOption::LeaseBased if self.ready_to_read_index() => {
if let Some(m) = self.response_ready_read(m, self.raft_log.committed) {
self.send(m);
}
}
// ReadIndex request is dropped silently.
ReadOnlyOption::LeaseBased => {}
}
return Ok(());
}
Expand Down Expand Up @@ -2204,6 +2171,29 @@ impl<T: Storage> Raft<T> {
self.lead_transferee = None;
}

/// Test the peer is ready to response `ReadIndex` requests or not.
pub fn ready_to_read_index(&self) -> bool {
let read_index = self.raft_log.committed;
self.raft_log.term(read_index).unwrap_or(0) == self.term
}

fn response_ready_read(&mut self, mut req: Message, index: u64) -> Option<Message> {
if req.from == INVALID_ID || req.from == self.id {
let rs = ReadState {
index,
request_ctx: req.take_entries()[0].take_data(),
};
self.read_states.push(rs);
return None;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgReadIndexResp);
to_send.to = req.from;
to_send.index = index;
to_send.set_entries(req.take_entries());
Some(to_send)
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
Expand Down
31 changes: 25 additions & 6 deletions src/read_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

use std::collections::VecDeque;

use slog::Logger;

use crate::eraftpb::Message;
use crate::{HashMap, HashSet};

Expand Down Expand Up @@ -66,6 +64,9 @@ pub struct ReadOnly {
pub option: ReadOnlyOption,
pub pending_read_index: HashMap<Vec<u8>, ReadIndexStatus>,
pub read_index_queue: VecDeque<Vec<u8>>,
// Items in `read_index_queue` with index *less* than `waiting_for_ready`
// are pending because the peer hasn't committed to its term.
waiting_for_ready: usize,
}

impl ReadOnly {
Expand All @@ -74,6 +75,7 @@ impl ReadOnly {
option,
pending_read_index: HashMap::default(),
read_index_queue: VecDeque::new(),
waiting_for_ready: 0,
}
}

Expand Down Expand Up @@ -119,14 +121,16 @@ impl ReadOnly {
/// Advances the read only request queue kept by the ReadOnly struct.
/// It dequeues the requests until it finds the read only request that has
/// the same context as the given `m`.
pub fn advance(&mut self, m: &Message, logger: &Logger) -> Vec<ReadIndexStatus> {
pub fn advance(&mut self, m: &Message, ready: bool) -> Vec<ReadIndexStatus> {
let mut rss = vec![];
if let Some(i) = self.read_index_queue.iter().position(|x| {
if !self.pending_read_index.contains_key(x) {
fatal!(logger, "cannot find correspond read state from pending map");
}
debug_assert!(self.pending_read_index.contains_key(x));
*x == m.context
}) {
if !ready {
self.waiting_for_ready = std::cmp::max(self.waiting_for_ready, i + 1);
return rss;
}
for _ in 0..=i {
let rs = self.read_index_queue.pop_front().unwrap();
let status = self.pending_read_index.remove(&rs).unwrap();
Expand All @@ -136,6 +140,21 @@ impl ReadOnly {
rss
}

pub(crate) fn advance_by_commit(&mut self, committed: u64) -> Vec<ReadIndexStatus> {
let mut rss = vec![];
if self.waiting_for_ready > 0 {
let remained = self.read_index_queue.split_off(self.waiting_for_ready);
self.waiting_for_ready = 0;
for rs in std::mem::replace(&mut self.read_index_queue, remained) {
let mut status = self.pending_read_index.remove(&rs).unwrap();
// Use latest committed index to avoid stale read on follower peers.
status.index = committed;
rss.push(status);
}
}
rss
}

/// Returns the context of the last pending read only request in ReadOnly struct.
pub fn last_pending_request_ctx(&self) -> Option<Vec<u8>> {
self.read_index_queue.back().cloned()
Expand Down