Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: paginate the unapplied config changes scan #530

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 49 additions & 52 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,15 +1240,6 @@ impl<T: Storage> Raft<T> {
trace!(self.logger, "EXIT become_leader");
}

fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.iter()
.filter(|e| {
e.get_entry_type() == EntryType::EntryConfChange
|| e.get_entry_type() == EntryType::EntryConfChangeV2
})
.count()
}

// Campaign to attempt to become a leader.
//
// If prevote is enabled, this is handled as well.
Expand Down Expand Up @@ -1517,43 +1508,27 @@ impl<T: Storage> Raft<T> {
return;
}

// Scan all unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
//
// If there is a pending snapshot, its index will be returned by
// `maybe_first_index`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
let first_index = match self.raft_log.unstable.maybe_first_index() {
let low = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

let ents = self
.raft_log
.slice(
first_index,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::TransferLeader),
)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::TransferLeader);
if self.has_unapplied_conf_changes(low, high, ctx) {
warn!(
self.logger,
"cannot campaign at term {term} since there are still {pending_changes} pending \
configuration changes to apply",
term = self.term,
pending_changes = n;
"cannot campaign at term {} since there are still pending configuration changes to apply", self.term
);
return;
}

info!(
self.logger,
"starting a new election";
Expand All @@ -1568,6 +1543,40 @@ impl<T: Storage> Raft<T> {
}
}

fn has_unapplied_conf_changes(&self, lo: u64, hi: u64, context: GetEntriesContext) -> bool {
if self.raft_log.applied >= self.raft_log.committed {
// in fact applied == committed
return false;
}
let mut found = false;
// Reuse the max_committed_size_per_ready limit because it is used for
// similar purposes (limiting the read of unapplied committed entries)
// when raft sends entries via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
let page_size = self.max_committed_size_per_ready;
if let Err(err) = self.raft_log.scan(lo, hi, page_size, context, |ents| {
for e in ents {
if e.get_entry_type() == EntryType::EntryConfChange
|| e.get_entry_type() == EntryType::EntryConfChangeV2
{
found = true;
return false;
}
}
true
}) {
fatal!(
self.logger,
"error scanning unapplied entries [{}, {}): {:?}",
lo,
hi,
err
);
}
found
}

fn log_vote_approve(&self, m: &Message) {
info!(
self.logger,
Expand Down Expand Up @@ -2186,24 +2195,12 @@ impl<T: Storage> Raft<T> {
return;
}

let ents = self
.raft_log
.slice(
last_commit + 1,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::CommitByVote),
)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
last_commit + 1,
self.raft_log.committed + 1,
e
);
});
if self.num_pending_conf(&ents) != 0 {
// Scan all unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
let low = last_commit + 1;
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::CommitByVote);
if self.has_unapplied_conf_changes(low, high, ctx) {
// The candidate doesn't have to step down in theory, here just for best
// safety as we assume quorum won't change during election.
let term = self.term;
Expand Down
125 changes: 125 additions & 0 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,41 @@ impl<T: Storage> RaftLog<T> {
}
}

// scan visits all log entries in the [lo, hi) range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to page_size bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// error.
//
// If the callback returns false, scan terminates.
pub(crate) fn scan<F>(
&self,
mut lo: u64,
hi: u64,
page_size: u64,
context: GetEntriesContext,
mut v: F,
) -> Result<()>
where
F: FnMut(Vec<Entry>) -> bool,
{
while lo < hi {
let ents = self.slice(lo, hi, page_size, context)?;
if ents.is_empty() {
return Err(Error::Store(StorageError::Other(
format!("got 0 entries in [{}, {})", lo, hi).into(),
)));
}
lo += ents.len() as u64;
if !v(ents) {
return Ok(());
}
}
Ok(())
}

/// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(
Expand Down Expand Up @@ -672,6 +707,7 @@ mod test {
use crate::errors::{Error, StorageError};
use crate::raft_log::{self, RaftLog};
use crate::storage::{GetEntriesContext, MemStorage};
use crate::NO_LIMIT;
use protobuf::Message as PbMessage;

fn new_entry(index: u64, term: u64) -> eraftpb::Entry {
Expand Down Expand Up @@ -1283,6 +1319,95 @@ mod test {
}
}

fn ents_size(ents: &[eraftpb::Entry]) -> u64 {
let mut size = 0;
for ent in ents {
size += ent.compute_size() as u64;
}
size
}

#[test]
fn test_scan() {
let offset = 47;
let num = 20;
let last = offset + num;
let half = offset + num / 2;
let entries = |from, to| {
let mut ents = vec![];
for i in from..to {
ents.push(new_entry(i, i));
}
ents
};
let entry_size = ents_size(&entries(half, half + 1));

let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap();
store.wl().append(&entries(offset + 1, half)).unwrap();
let mut raft_log = RaftLog::new(store, default_logger());
raft_log.append(&entries(half, last));

// Test that scan() returns the same entries as slice(), on all inputs.
for page_size in [0, 1, 10, 100, entry_size, entry_size + 1] {
for lo in offset + 1..last {
for hi in lo..=last {
let mut got = vec![];
raft_log
.scan(lo, hi, page_size, GetEntriesContext::empty(false), |e| {
assert!(
e.len() == 1 || ents_size(&e) < page_size,
"{} {} {}",
e.len(),
ents_size(&e),
page_size
);
got.extend(e);
true
})
.unwrap();
let want = raft_log
.slice(lo, hi, NO_LIMIT, GetEntriesContext::empty(false))
.unwrap();
assert_eq!(
got, want,
"scan() and slice() mismatch on [{}, {}) @ {}",
lo, hi, page_size
);
}
}
}

// Test that the callback early return.
let mut iters = 0;
raft_log
.scan(offset + 1, half, 0, GetEntriesContext::empty(false), |_| {
iters += 1;
if iters == 2 {
return false;
}
true
})
.unwrap();
assert_eq!(iters, 2);

// Test that we max out the limit, and not just always return a single entry.
// NB: this test works only because the requested range length is even.
raft_log
.scan(
offset + 1,
offset + 11,
entry_size * 2,
GetEntriesContext::empty(false),
|e| {
assert_eq!(e.len(), 2);
assert_eq!(entry_size * 2, ents_size(&e));
true
},
)
.unwrap();
}

/// `test_log_maybe_append` ensures:
/// If the given (index, term) matches with the existing log:
/// 1. If an existing entry conflicts with a new one (same index
Expand Down
6 changes: 3 additions & 3 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl RaftState {
}

/// Records the context of the caller who calls entries() of Storage trait.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct GetEntriesContext(pub(crate) GetEntriesFor);

impl GetEntriesContext {
Expand All @@ -76,7 +76,7 @@ impl GetEntriesContext {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub(crate) enum GetEntriesFor {
// for sending entries to followers
SendAppend {
Expand Down Expand Up @@ -120,7 +120,7 @@ pub trait Storage {
/// Storage should check context.can_async() first and decide whether to fetch entries asynchronously
/// based on its own implementation. If the entries are fetched asynchronously, storage should return
/// LogTemporarilyUnavailable, and application needs to call `on_entries_fetched(context)` to trigger
/// re-fetch of the entries after the storage finishes fetching the entries.
/// re-fetch of the entries after the storage finishes fetching the entries.
///
/// # Panics
///
Expand Down