Skip to content

Commit

Permalink
raftstore: reduce message flush (tikv#5475)
Browse files Browse the repository at this point in the history
* raftstore: reduce message flush

Signed-off-by: Jay Lee <[email protected]>
  • Loading branch information
BusyJay committed Oct 10, 2019
1 parent 99a0fa8 commit 79ed168
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
6 changes: 6 additions & 0 deletions components/tikv_util/src/mpsc/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ pub struct Receiver<T> {
}

impl<T> Sender<T> {
pub fn is_empty(&self) -> bool {
// When there is no sender references, it can't be known whether
// it's empty or not.
self.sender.as_ref().map_or(false, |s| s.is_empty())
}

#[inline]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.sender.as_ref().unwrap().send(t)?;
Expand Down
12 changes: 9 additions & 3 deletions src/raftstore/store/fsm/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ pub trait PollHandler<N, C> {

/// This function is called at the end of every round.
fn end(&mut self, batch: &mut [Box<N>]);

/// This function is called when batch system is going to sleep.
fn pause(&mut self) {}
}

/// Internal poller that fetches batch and call handler hooks for readiness.
Expand All @@ -279,16 +282,19 @@ struct Poller<N: Fsm, C: Fsm, Handler> {
}

impl<N: Fsm, C: Fsm, Handler: PollHandler<N, C>> Poller<N, C, Handler> {
fn fetch_batch(&self, batch: &mut Batch<N, C>, max_size: usize) {
fn fetch_batch(&mut self, batch: &mut Batch<N, C>, max_size: usize) {
let curr_batch_len = batch.len();
if batch.control.is_some() || curr_batch_len >= max_size {
// Do nothing if there's a pending control fsm or the batch is already full.
return;
}

let mut pushed = if curr_batch_len == 0 {
// Block if the batch is empty.
match self.fsm_receiver.recv() {
match self.fsm_receiver.try_recv().or_else(|_| {
self.handler.pause();
// Block if the batch is empty.
self.fsm_receiver.recv()
}) {
Ok(fsm) => batch.push(fsm),
Err(_) => return,
}
Expand Down
21 changes: 10 additions & 11 deletions src/raftstore/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,9 @@ impl<T: Transport, C: PdClient> RaftPoller<T, C> {
.schedule_task(prop.region_id, ApplyTask::Proposal(prop));
}
}
if self.poll_ctx.need_flush_trans {
if self.poll_ctx.need_flush_trans
&& (!self.poll_ctx.kv_wb.is_empty() || !self.poll_ctx.raft_wb.is_empty())
{
self.poll_ctx.trans.flush();
self.poll_ctx.need_flush_trans = false;
}
Expand Down Expand Up @@ -543,11 +545,6 @@ impl<T: Transport, C: PdClient> RaftPoller<T, C> {
.append_log
.observe(duration_to_sec(dur) as f64);

if self.poll_ctx.need_flush_trans {
self.poll_ctx.trans.flush();
self.poll_ctx.need_flush_trans = false;
}

slow_log!(
self.timer,
"{} handle {} pending peers include {} ready, {} entries, {} messages and {} \
Expand All @@ -568,7 +565,6 @@ impl<T: Transport, C: PdClient> PollHandler<PeerFsm, StoreFsm> for RaftPoller<T,
self.poll_ctx.pending_count = 0;
self.poll_ctx.sync_log = false;
self.poll_ctx.has_ready = false;
self.poll_ctx.need_flush_trans = false;
if self.pending_proposals.capacity() == 0 {
self.pending_proposals = Vec::with_capacity(batch_size);
}
Expand Down Expand Up @@ -654,10 +650,6 @@ impl<T: Transport, C: PdClient> PollHandler<PeerFsm, StoreFsm> for RaftPoller<T,
if self.poll_ctx.has_ready {
self.handle_raft_ready(peers);
}
if self.poll_ctx.need_flush_trans {
self.poll_ctx.trans.flush();
self.poll_ctx.need_flush_trans = false;
}
if !self.poll_ctx.queued_snapshot.is_empty() {
let mut meta = self.poll_ctx.store_meta.lock().unwrap();
meta.pending_snapshot_regions
Expand All @@ -671,6 +663,13 @@ impl<T: Transport, C: PdClient> PollHandler<PeerFsm, StoreFsm> for RaftPoller<T,
self.poll_ctx.raft_metrics.flush();
self.poll_ctx.store_stat.flush();
}

fn pause(&mut self) {
if self.poll_ctx.need_flush_trans {
self.poll_ctx.trans.flush();
self.poll_ctx.need_flush_trans = false;
}
}
}

pub struct RaftPollerBuilder<T, C> {
Expand Down
3 changes: 3 additions & 0 deletions src/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ impl<T: RaftStoreRouter> RaftClient<T> {
pub fn flush(&mut self) {
let (mut counter, mut delay_counter) = (0, 0);
for conn in self.conns.values_mut() {
if conn.stream.is_empty() {
continue;
}
if let Some(notifier) = conn.stream.get_notifier() {
if !self.grpc_thread_load.in_heavy_load() {
notifier.notify();
Expand Down

0 comments on commit 79ed168

Please sign in to comment.