Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
raft: paginate the unapplied config changes scan
Browse files Browse the repository at this point in the history
Fix potentially unlimited memory usage spike possible in raft.hup() which reads
all unapplied committed entries in order to check that there are no unapplied
config changes. This PR paginates this scan so that the spike is limited to
MaxCommittedSizePerReady. It also terminates the scan early if a config change
has been found.

It is ported from etcd-io/raft#32.
It is ported from tikv/raft-rs#530.

Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Oct 18, 2023
1 parent 75c8d73 commit 5bf990b
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 50 deletions.
92 changes: 44 additions & 48 deletions src/raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,12 +1168,6 @@ impl<T: Storage> Raft<T> {
trace!("EXIT become_leader");
}

fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.iter()
.filter(|e| e.entry_type() == EntryType::EntryConfChange)
.count()
}

// Campaign to attempt to become a leader.
//
// If prevote is enabled, this is handled as well.
Expand Down Expand Up @@ -1430,40 +1424,24 @@ impl<T: Storage> Raft<T> {
return;
}

// Scan all the unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
//
// If there is a pending snapshot, its index will be returned by
// `maybe_first_index`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
let first_index = match self.raft_log.unstable.maybe_first_index() {
let low = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

let ents = self
.raft_log
.slice(
first_index,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::TransferLeader),
)
.unwrap_or_else(|e| {
panic!(
"unexpected error getting unapplied entries [{}, {}): {:?}",
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
warn!(
"cannot campaign at term {} since there are still {} pending configuration changes to apply",
self.term,
n
);
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::TransferLeader);
if self.has_unapplied_conf_changes(low, high, ctx) {
warn!("cannot campaign at term {} since there are still pending configuration changes to apply", self.term);
return;
}

info!(term = self.term, "starting a new election");
if transfer_leader {
self.campaign(CAMPAIGN_TRANSFER);
Expand All @@ -1474,6 +1452,35 @@ impl<T: Storage> Raft<T> {
}
}

fn has_unapplied_conf_changes(&self, lo: u64, hi: u64, context: GetEntriesContext) -> bool {
if self.raft_log.applied >= self.raft_log.committed {
// in fact applied == committed
return false;
}
let mut found = false;
// Reuse the max_committed_size_per_ready limit because it is used for
// similar purposes (limiting the read of unapplied committed entries)
// when raft sends entries via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
let page_size = self.max_committed_size_per_ready;
if let Err(err) = self.raft_log.scan(lo, hi, page_size, context, |ents| {
for e in ents {
if e.entry_type() == EntryType::EntryConfChange {
found = true;
return false;
}
}
true
}) {
panic!(
"error scanning unapplied entries [{}, {}): {:?}",
lo, hi, err
);
}
found
}

fn log_vote_approve(&self, m: &Message) {
info!(
msg_type = ?m.msg_type(),
Expand Down Expand Up @@ -2028,23 +2035,12 @@ impl<T: Storage> Raft<T> {
return;
}

let ents = self
.raft_log
.slice(
last_commit + 1,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::CommitByVote),
)
.unwrap_or_else(|e| {
panic!(
"unexpected error getting unapplied entries [{}, {}): {:?}",
last_commit + 1,
self.raft_log.committed + 1,
e
);
});
if self.num_pending_conf(&ents) != 0 {
// Scan all unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
let low = last_commit + 1;
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::CommitByVote);
if self.has_unapplied_conf_changes(low, high, ctx) {
// The candidate doesn't have to step down in theory, here just for best
// safety as we assume quorum won't change during election.
let term = self.term;
Expand Down
125 changes: 125 additions & 0 deletions src/raft/src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,41 @@ impl<T: Storage> RaftLog<T> {
}
}

/// Visit all log entries in the [lo, hi) range, and return them via the
/// given callback. The callback can be invoked multiple times, with consecutive
/// sub-ranges of the requested range. Returns up to page_size bytes worth of
/// entries at a time. May return more if a single entry size exceeds the limit.
///
/// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
/// error.
///
/// If the callback returns false, scan terminates.
pub(crate) fn scan<F>(
&self,
mut lo: u64,
hi: u64,
page_size: u64,
context: GetEntriesContext,
mut v: F,
) -> Result<()>
where
F: FnMut(Vec<Entry>) -> bool,
{
while lo < hi {
let ents = self.slice(lo, hi, page_size, context)?;
if ents.is_empty() {
return Err(Error::Store(StorageError::Other(
format!("got 0 entries in [{}, {})", lo, hi).into(),
)));
}
lo += ents.len() as u64;
if !v(ents) {
return Ok(());
}
}
Ok(())
}

/// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(
Expand Down Expand Up @@ -634,6 +669,7 @@ mod test {
errors::{Error, StorageError},
raft_log::{self, RaftLog},
storage::{GetEntriesContext, MemStorage},
NO_LIMIT,
};

fn new_entry(index: u64, term: u64) -> eraftpb::Entry {
Expand All @@ -644,6 +680,14 @@ mod test {
}
}

fn ents_size(ents: &[eraftpb::Entry]) -> u64 {
let mut size = 0;
for ent in ents {
size += ent.encoded_len() as u64;
}
size
}

fn new_snapshot(meta_index: u64, meta_term: u64) -> eraftpb::Snapshot {
eraftpb::Snapshot {
metadata: Some(eraftpb::SnapshotMetadata {
Expand Down Expand Up @@ -1243,6 +1287,87 @@ mod test {
}
}

#[test]
fn test_scan() {
let offset = 47;
let num = 20;
let last = offset + num;
let half = offset + num / 2;
let entries = |from, to| {
let mut ents = vec![];
for i in from..to {
ents.push(new_entry(i, i));
}
ents
};
let entry_size = ents_size(&entries(half, half + 1));

let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap();
store.wl().append(&entries(offset + 1, half)).unwrap();
let mut raft_log = RaftLog::new(store);
raft_log.append(&entries(half, last));

// Test that scan() returns the same entries as slice(), on all inputs.
for page_size in [0, 1, 10, 100, entry_size, entry_size + 1] {
for lo in offset + 1..last {
for hi in lo..=last {
let mut got = vec![];
raft_log
.scan(lo, hi, page_size, GetEntriesContext::empty(false), |e| {
assert!(
e.len() == 1 || ents_size(&e) < page_size,
"{} {} {}",
e.len(),
ents_size(&e),
page_size
);
got.extend(e);
true
})
.unwrap();
let want = raft_log
.slice(lo, hi, NO_LIMIT, GetEntriesContext::empty(false))
.unwrap();
assert_eq!(
got, want,
"scan() and slice() mismatch on [{}, {}) @ {}",
lo, hi, page_size
);
}
}
}

// Test that the callback early return.
let mut iters = 0;
raft_log
.scan(offset + 1, half, 0, GetEntriesContext::empty(false), |_| {
iters += 1;
if iters == 2 {
return false;
}
true
})
.unwrap();
assert_eq!(iters, 2);

// Test that we max out the limit, and not just always return a single entry.
// NB: this test works only because the requested range length is even.
raft_log
.scan(
offset + 1,
offset + 11,
entry_size * 2,
GetEntriesContext::empty(false),
|e| {
assert_eq!(e.len(), 2);
assert_eq!(entry_size * 2, ents_size(&e));
true
},
)
.unwrap();
}

/// `test_log_maybe_append` ensures:
/// If the given (index, term) matches with the existing log:
/// 1. If an existing entry conflicts with a new one (same index
Expand Down
4 changes: 2 additions & 2 deletions src/raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl RaftState {
}

/// Records the context of the caller who calls entries() of Storage trait.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct GetEntriesContext(pub(crate) GetEntriesFor);

impl GetEntriesContext {
Expand All @@ -78,7 +78,7 @@ impl GetEntriesContext {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub(crate) enum GetEntriesFor {
// for sending entries to followers
SendAppend {
Expand Down

0 comments on commit 5bf990b

Please sign in to comment.