Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 31 additions & 34 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,40 +158,37 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
debug!("shutdown");
},
// Handle mailbox messages
mail = self.mailbox_receiver.recv() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
break;
};
match msg {
Message::Broadcast {
recipients,
message,
responder,
} => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder)
.await;
}
Message::Subscribe {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder)
.await;
}
Message::Get {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
Some(msg) = self.mailbox_receiver.recv() else {
error!("mailbox receiver failed");
break;
} => match msg {
Message::Broadcast {
recipients,
message,
responder,
} => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder)
.await;
}
Message::Subscribe {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder)
.await;
}
Message::Get {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
},
// Handle incoming messages
Expand Down
70 changes: 32 additions & 38 deletions collector/src/p2p/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,52 +132,46 @@ where
debug!("context shutdown, stopping engine");
},
// Command from the mailbox
command = self.mailbox.recv() => {
if let Some(command) = command {
match command {
Message::Send {
request,
recipients,
responder,
} => {
// Track commitment (if not already tracked)
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
self.outstanding.inc();
(HashSet::new(), HashSet::new())
});

// Send the request to recipients
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
}
Err(err) => {
error!(?err, ?commitment, "failed to send message");
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
Some(command) = self.mailbox.recv() else continue => {
match command {
Message::Send {
request,
recipients,
responder,
} => {
// Track commitment (if not already tracked)
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
self.outstanding.inc();
(HashSet::new(), HashSet::new())
});

// Send the request to recipients
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
}
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
Err(err) => {
error!(?err, ?commitment, "failed to send message");
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
let _ = self.outstanding.try_set(self.tracked.len());
}
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
}
let _ = self.outstanding.try_set(self.tracked.len());
}
}
},

// Response from a handler
ready = processed.next_completed() => {
// Error handling
let Ok((peer, reply)) = ready else {
continue;
};
Ok((peer, reply)) = processed.next_completed() else continue => {
self.responses.inc();

// Send the response
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,10 @@ impl<
debug!("shutdown");
},
// Handle refresh epoch deadline
epoch = epoch_updates.recv() => {
// Error handling
let Some(epoch) = epoch else {
error!("epoch subscription failed");
break;
};

Some(epoch) = epoch_updates.recv() else {
error!("epoch subscription failed");
break;
} => {
// Refresh the epoch
debug!(current = %self.epoch, new = %epoch, "refresh epoch");
assert!(epoch >= self.epoch);
Expand Down
25 changes: 10 additions & 15 deletions consensus/src/marshal/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,8 @@ where
on_stopped => {
debug!("context shutdown, stopping marshal");
},
// Handle waiter completions first
result = waiters.next_completed() => {
let Ok((commitment, block)) = result else {
continue; // Aborted future
};
// Handle waiter completions first (aborted futures are skipped)
Ok((commitment, block)) = waiters.next_completed() else continue => {
self.notify_subscribers(commitment, &block).await;
},
// Handle application acknowledgements next
Expand All @@ -346,11 +343,10 @@ where
}
},
// Handle consensus inputs before backfill or resolver traffic
mailbox_message = self.mailbox.recv() => {
let Some(message) = mailbox_message else {
info!("mailbox closed, shutting down");
break;
};
Some(message) = self.mailbox.recv() else {
info!("mailbox closed, shutting down");
break;
} => {
match message {
Message::GetInfo {
identifier,
Expand Down Expand Up @@ -567,11 +563,10 @@ where
}
},
// Handle resolver messages last
message = resolver_rx.recv() => {
let Some(message) = message else {
info!("handler closed, shutting down");
break;
};
Some(message) = resolver_rx.recv() else {
info!("handler closed, shutting down");
break;
} => {
match message {
handler::Message::Produce { key, response } => {
match key {
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/ordered_broadcast/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,10 @@ impl<
debug!("shutdown");
},
// Handle refresh epoch deadline
epoch = epoch_updates.recv() => {
// Error handling
let Some(epoch) = epoch else {
error!("epoch subscription failed");
break;
};

Some(epoch) = epoch_updates.recv() else {
error!("epoch subscription failed");
break;
} => {
// Refresh the epoch
debug!(current = %self.epoch, new = %epoch, "refresh epoch");
assert!(epoch >= self.epoch);
Expand Down
97 changes: 41 additions & 56 deletions consensus/src/simplex/actors/batcher/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,61 +195,51 @@ impl<
on_stopped => {
debug!("context shutdown, stopping batcher");
},
message = self.mailbox_receiver.recv() => {
match message {
Some(Message::Update {
current: new_current,
leader,
finalized: new_finalized,
active,
}) => {
current = new_current;
finalized = new_finalized;
work.entry(current)
.or_insert_with(|| self.new_round())
.set_leader(leader);

// Check if the leader has been active recently
let skip_timeout = self.skip_timeout.get() as usize;
let is_active =
// Ensure we have enough data to judge activity (none of this
// data may be in the last skip_timeout views if we jumped ahead
// to a new view)
work.len() < skip_timeout
// Leader active in at least one recent round
|| work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader));
active.send_lossy(is_active);

// Setting leader may enable batch verification
updated_view = current;
Some(message) = self.mailbox_receiver.recv() else break => match message {
Message::Update {
current: new_current,
leader,
finalized: new_finalized,
active,
} => {
current = new_current;
finalized = new_finalized;
work.entry(current)
.or_insert_with(|| self.new_round())
.set_leader(leader);

// Check if the leader has been active recently
let skip_timeout = self.skip_timeout.get() as usize;
let is_active =
// Ensure we have enough data to judge activity (none of this
// data may be in the last skip_timeout views if we jumped ahead
// to a new view)
work.len() < skip_timeout
// Leader active in at least one recent round
|| work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader));
active.send_lossy(is_active);

// Setting leader may enable batch verification
updated_view = current;
}
Message::Constructed(message) => {
// If the view isn't interesting, we can skip
let view = message.view();
if !interesting(self.activity_timeout, finalized, current, view, false) {
continue;
}
Some(Message::Constructed(message)) => {
// If the view isn't interesting, we can skip
let view = message.view();
if !interesting(self.activity_timeout, finalized, current, view, false) {
continue;
}

// Add the message to the verifier
work.entry(view)
.or_insert_with(|| self.new_round())
.add_constructed(message)
.await;
self.added.inc();
updated_view = view;
}
None => {
break;
}
// Add the message to the verifier
work.entry(view)
.or_insert_with(|| self.new_round())
.add_constructed(message)
.await;
self.added.inc();
updated_view = view;
}
},
// Handle certificates from the network
message = certificate_receiver.recv() => {
// If the channel is closed, we should exit
let Ok((sender, message)) = message else {
break;
};

Ok((sender, message)) = certificate_receiver.recv() else break => {
// If there is a decoding error, block
let Ok(message) = message else {
warn!(?sender, "blocking peer for decoding error");
Expand Down Expand Up @@ -361,12 +351,7 @@ impl<
continue;
},
// Handle votes from the network
message = vote_receiver.recv() => {
// If the channel is closed, we should exit
let Ok((sender, message)) = message else {
break;
};

Ok((sender, message)) = vote_receiver.recv() else break => {
// If there is a decoding error, block
let Ok(message) = message else {
warn!(?sender, "blocking peer for decoding error");
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/simplex/actors/resolver/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ impl<
_ = &mut resolver_task => {
break;
},
mailbox = self.mailbox_receiver.recv() => {
let Some(message) = mailbox else {
break;
};
Some(message) = self.mailbox_receiver.recv() else break => {
match message {
MailboxMessage::Certificate(certificate) => {
// Certificates from mailbox have no associated request view
Expand All @@ -146,10 +143,7 @@ impl<
}
}
},
handler = handler_rx.recv() => {
let Some(message) = handler else {
break;
};
Some(message) = handler_rx.recv() else break => {
self.handle_resolver(message, &mut voter, &mut resolver)
.await;
},
Expand Down
Loading
Loading