Skip to content

Commit

Permalink
Maintain broadcast group configuration in progress
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <[email protected]>
  • Loading branch information
LintianShi committed Oct 13, 2022
1 parent 4a36655 commit 87b2014
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 25 deletions.
75 changes: 50 additions & 25 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,31 @@ impl<T: Storage> Raft<T> {
self.batch_append = batch_append;
}

/// Assigns broadcast groups to peers.
///
/// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0.
///
/// The group information is only stored in memory. So you need to configure
/// it every time a raft state machine is initialized or a snapshot is applied.
pub fn assign_broadcast_groups(&mut self, ids: &[(u64, u64)]) {
let prs = self.mut_prs();
for (peer_id, group_id) in ids {
assert!(*group_id > 0);
if let Some(pr) = prs.get_mut(*peer_id) {
pr.broadcast_group_id = *group_id;
} else {
continue;
}
}
}

/// Removes all broadcast group configurations.
pub fn clear_broadcast_group(&mut self) {
for (_, pr) in self.mut_prs().iter_mut() {
pr.broadcast_group_id = 0;
}
}

/// Configures group commit.
///
/// If group commit is enabled, only logs replicated to at least two
Expand All @@ -528,6 +553,31 @@ impl<T: Storage> Raft<T> {
self.prs().group_commit()
}

/// Checks whether the raft group is using group commit and consistent
/// over group.
///
/// If it can't get a correct answer, `None` is returned.
pub fn check_group_commit_consistent(&mut self) -> Option<bool> {
if self.state != StateRole::Leader {
return None;
}
// Previous leader may have reach consistency already.
//
// check applied_index instead of committed_index to avoid pending conf change.
if !self.apply_to_current_term() {
return None;
}
let (index, use_group_commit) = self.mut_prs().maximal_committed_index();
debug!(
self.logger,
"check group commit consistent";
"index" => index,
"use_group_commit" => use_group_commit,
"committed" => self.raft_log.committed
);
Some(use_group_commit && index == self.raft_log.committed)
}

/// Assigns groups to peers.
///
/// The tuple is (`peer_id`, `group_id`). `group_id` should be larger than 0.
Expand Down Expand Up @@ -556,31 +606,6 @@ impl<T: Storage> Raft<T> {
}
}

/// Checks whether the raft group is using group commit and consistent
/// over group.
///
/// If it can't get a correct answer, `None` is returned.
pub fn check_group_commit_consistent(&mut self) -> Option<bool> {
if self.state != StateRole::Leader {
return None;
}
// Previous leader may have reach consistency already.
//
// check applied_index instead of committed_index to avoid pending conf change.
if !self.apply_to_current_term() {
return None;
}
let (index, use_group_commit) = self.mut_prs().maximal_committed_index();
debug!(
self.logger,
"check group commit consistent";
"index" => index,
"use_group_commit" => use_group_commit,
"committed" => self.raft_log.committed
);
Some(use_group_commit && index == self.raft_log.committed)
}

/// Checks if logs are committed to its term.
///
/// The check is useful usually when raft is leader.
Expand Down
5 changes: 5 additions & 0 deletions src/tracker/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub struct Progress {
/// Only logs replicated to different group will be committed if any group is configured.
pub commit_group_id: u64,

/// Leader only replicates log entries to the agent of each group,
/// and the agent broadcasts logs within a group.
pub broadcast_group_id: u64,

/// Committed index in raft_log
pub committed_index: u64,
}
Expand All @@ -68,6 +72,7 @@ impl Progress {
recent_active: false,
ins: Inflights::new(ins_size),
commit_group_id: 0,
broadcast_group_id: 0,
committed_index: 0,
}
}
Expand Down

0 comments on commit 87b2014

Please sign in to comment.