Skip to content

Commit

Permalink
Commit aggregation (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zacholme7 authored Feb 14, 2025
1 parent afa6c0d commit 0cdb9bb
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 17 deletions.
55 changes: 48 additions & 7 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ where
/// Past prepare consensus that we have reached
past_consensus: HashMap<Round, D::Hash>,

/// Aggregated commit message
aggregated_commit: Option<SignedSSVMessage>,

// Network sender
send_message: S,
}
Expand Down Expand Up @@ -136,6 +139,8 @@ where

past_consensus: HashMap::new(),

aggregated_commit: None,

send_message,
};
qbft.data
Expand All @@ -160,6 +165,11 @@ where
self.start_round();
}

// Get the aggregated commit message, if it exists
pub fn get_aggregated_commit(&self) -> Option<SignedSSVMessage> {
self.aggregated_commit.clone()
}

// Validation and check functions.
fn check_leader(&self, operator_id: &OperatorId) -> bool {
self.config.leader_fn().leader_function(
Expand Down Expand Up @@ -659,17 +669,48 @@ where

// All validation successful, make sure we are in the proper commit state
if matches!(self.state, InstanceState::Commit) {
// Todo!(). Commit aggregation

// We have come to commit consensus, mark ourself as completed and record the agreed upon
// value
self.state = InstanceState::Complete;
self.completed = Some(Completed::Success(hash));
debug!(in = ?self.config.operator_id(), state = ?self.state, "Reached a COMMIT consensus. Success!");
// Aggregate all of the commit messages
let commit_quorum = self.commit_container.get_quorum_of_messages(round);
let aggregated_commit = self.aggregate_commit_messages(commit_quorum);
if aggregated_commit.is_some() {
debug!(in = ?self.config.operator_id(), state = ?self.state, "Reached a COMMIT consensus. Success!");
self.state = InstanceState::Complete;
self.completed = Some(Completed::Success(hash));
self.aggregated_commit = aggregated_commit;
} else {
error!("Failed to aggregate commit quorum")
}
}
}
}

fn aggregate_commit_messages(
&self,
commit_quorum: Vec<WrappedQbftMessage>,
) -> Option<SignedSSVMessage> {
// We know this exists, but in favor of avoiding expect match the first element to Some.
// This will be the commit message that we aggregate on top of
if let Some(first_commit) = commit_quorum.first() {
let mut aggregated_commit = first_commit.signed_message.clone();
let aggregated_ssv = aggregated_commit.ssv_message();

// Sanity check that all of the messages match
commit_quorum[1..]
.iter()
.all(|commit_msg| aggregated_ssv == commit_msg.signed_message.ssv_message())
.then_some(())?;

// Aggregate all of the commits together
let signed_commits = commit_quorum[1..]
.iter()
.map(|msg| msg.signed_message.clone());
aggregated_commit.aggregate(signed_commits);
return Some(aggregated_commit);
}

None
}

/// We have received a round change message.
fn received_round_change(
&mut self,
Expand Down
16 changes: 16 additions & 0 deletions anchor/common/qbft/src/msg_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ impl MessageContainer {
.unwrap_or(0)
}

/// If we have a quorum for the round, get all of the messages that correspond to that quorum
pub fn get_quorum_of_messages(&self, round: Round) -> Vec<WrappedQbftMessage> {
let mut msgs = vec![];
if let Some(hash) = self.has_quorum(round) {
// collect all of the messages where root = quorum hash
if let Some(round_messages) = self.messages.get(&round) {
for msg in round_messages.values() {
if msg.qbft_message.root == hash {
msgs.push(msg.clone());
}
}
}
}
msgs
}

/// Gets all messages for a specific round
pub fn get_messages_for_round(&self, round: Round) -> Vec<&WrappedQbftMessage> {
// If we have messages for this round in our container, return them all
Expand Down
15 changes: 15 additions & 0 deletions anchor/common/ssv_types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,21 @@ impl SignedSSVMessage {
&self.full_data
}

/// Aggregate a set of signed ssv messages into Self
pub fn aggregate<I>(&mut self, others: I)
where
I: IntoIterator<Item = SignedSSVMessage>,
{
for signed_msg in others {
// These will only all have 1 signature/operator, but we call extend for safety
self.signatures.extend(signed_msg.signatures);
self.operator_ids.extend(signed_msg.operator_ids);
}

self.signatures.sort();
self.operator_ids.sort();
}

// Validate the signed message to ensure that it is well formed for qbft processing
pub fn validate(&self) -> bool {
// OperatorID must have at least one element
Expand Down
11 changes: 11 additions & 0 deletions anchor/qbft_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,17 @@ async fn qbft_instance<D: QbftData<Hash = Hash256>>(
error!("could not send qbft result");
}
}

// Send the decided message (aggregated commit)
match qbft.get_aggregated_commit() {
Some(msg) => {
network_tx.send(msg).unwrap_or_else(|e| {
error!("Failed to send signed ssv message to network: {:?}", e)
});
}
None => error!("Aggregated commit does not exist"),
}

instance = QbftInstance::Decided { value: completed };
} else {
instance = QbftInstance::Initialized {
Expand Down
43 changes: 33 additions & 10 deletions anchor/qbft_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,17 @@ where
// Helper to verify consensus is reached
pub async fn verify_consensus(&mut self) {
while let Some(result) = self.consensus_rx.recv().await {
// Confirm that consensus was reached
assert!(result.reached_consensus, "Consensus was not reached");

// Confirm that the aggregated message contains a quorum of signatures
let aggregated_commit = result
.aggregated_commit
.expect("If consensus was reached, this exists");
assert!(
aggregated_commit.signatures().len() as u64
>= (self.tester.size as u64 - self.tester.size.get_f())
);
}
}
}
Expand Down Expand Up @@ -129,7 +139,7 @@ where
// Track mapping from operator id to the respective manager
managers: HashMap<OperatorId, Arc<QbftManager<ManualSlotClock>>>,
// The size of the committee
size: CommitteeSize,
pub size: CommitteeSize,
// Mapping of the data hash to the data identifier. This is to send data to the proper instance
identifiers: HashMap<u64, D::Id>,
// Mapping from data to the results of the consensus
Expand Down Expand Up @@ -367,15 +377,6 @@ where
drop(consensus_tx);
}

fn signed_to_wrapped(&self, signed: SignedSSVMessage) -> WrappedQbftMessage {
let deser_qbft = QbftMessage::from_ssz_bytes(signed.ssv_message().data())
.expect("We have a valid qbft message");
WrappedQbftMessage {
signed_message: signed,
qbft_message: deser_qbft,
}
}

// Once an instance has completed, we want to record what happened
fn handle_completion(&self, hash: Hash256, msg: Result<Completed<D>, QbftError>) {
// Decrement the amount of instances running for this data
Expand Down Expand Up @@ -415,6 +416,16 @@ where
finished
}

// Convert a signed ssv message into a wrapped ssv message
fn signed_to_wrapped(&self, signed: SignedSSVMessage) -> WrappedQbftMessage {
let deser_qbft = QbftMessage::from_ssz_bytes(signed.ssv_message().data())
.expect("We have a valid qbft message");
WrappedQbftMessage {
signed_message: signed,
qbft_message: deser_qbft,
}
}

// Process and send a network message to the correct instance
fn process_network_message(&self, mut wrapped_msg: WrappedQbftMessage) {
let sender_operator_id = wrapped_msg
Expand All @@ -424,6 +435,17 @@ where
.expect("One signer");
let sender_operator_id = OperatorId::from(*sender_operator_id);

// If this is a decided message, want to record it in the consensus results.
// We know this is an aggregated commit if the number of signatures is > 1
if wrapped_msg.signed_message.signatures().len() > 1 {
let mut results_write = self.results.write().unwrap();
let results = results_write
.get_mut(&wrapped_msg.qbft_message.root)
.expect("Value exists");
results.aggregated_commit = Some(wrapped_msg.signed_message);
return;
}

// Now we have a message ready to be sent back into the instance. Get the id
// corresponding to the message.
let data_id = self
Expand Down Expand Up @@ -500,6 +522,7 @@ pub struct ConsensusResult {
min_for_consensus: u64,
successful: u64,
timed_out: u64,
aggregated_commit: Option<SignedSSVMessage>,
}

#[cfg(test)]
Expand Down

0 comments on commit 0cdb9bb

Please sign in to comment.