Skip to content
Merged
65 changes: 31 additions & 34 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,40 +158,37 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
debug!("shutdown");
},
// Handle mailbox messages
mail = self.mailbox_receiver.recv() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
break;
};
match msg {
Message::Broadcast {
recipients,
message,
responder,
} => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder)
.await;
}
Message::Subscribe {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder)
.await;
}
Message::Get {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
Some(msg) = self.mailbox_receiver.recv() else {
error!("mailbox receiver failed");
break;
} => match msg {
Message::Broadcast {
recipients,
message,
responder,
} => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder)
.await;
}
Message::Subscribe {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder)
.await;
}
Message::Get {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
},
// Handle incoming messages
Expand Down
70 changes: 32 additions & 38 deletions collector/src/p2p/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,52 +132,46 @@ where
debug!("context shutdown, stopping engine");
},
// Command from the mailbox
command = self.mailbox.recv() => {
if let Some(command) = command {
match command {
Message::Send {
request,
recipients,
responder,
} => {
// Track commitment (if not already tracked)
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
self.outstanding.inc();
(HashSet::new(), HashSet::new())
});

// Send the request to recipients
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
}
Err(err) => {
error!(?err, ?commitment, "failed to send message");
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
Some(command) = self.mailbox.recv() else continue => {
match command {
Message::Send {
request,
recipients,
responder,
} => {
// Track commitment (if not already tracked)
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
self.outstanding.inc();
(HashSet::new(), HashSet::new())
});

// Send the request to recipients
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
}
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
Err(err) => {
error!(?err, ?commitment, "failed to send message");
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
let _ = self.outstanding.try_set(self.tracked.len());
}
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
}
let _ = self.outstanding.try_set(self.tracked.len());
}
}
},

// Response from a handler
ready = processed.next_completed() => {
// Error handling
let Ok((peer, reply)) = ready else {
continue;
};
Ok((peer, reply)) = processed.next_completed() else continue => {
self.responses.inc();

// Send the response
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,10 @@ impl<
debug!("shutdown");
},
// Handle refresh epoch deadline
epoch = epoch_updates.recv() => {
// Error handling
let Some(epoch) = epoch else {
error!("epoch subscription failed");
break;
};

Some(epoch) = epoch_updates.recv() else {
error!("epoch subscription failed");
break;
} => {
// Refresh the epoch
debug!(current = %self.epoch, new = %epoch, "refresh epoch");
assert!(epoch >= self.epoch);
Expand Down
25 changes: 10 additions & 15 deletions consensus/src/marshal/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,8 @@ where
on_stopped => {
debug!("context shutdown, stopping marshal");
},
// Handle waiter completions first
result = waiters.next_completed() => {
let Ok((commitment, block)) = result else {
continue; // Aborted future
};
// Handle waiter completions first (aborted futures are skipped)
Ok((commitment, block)) = waiters.next_completed() else continue => {
self.notify_subscribers(commitment, &block).await;
},
// Handle application acknowledgements next
Expand All @@ -346,11 +343,10 @@ where
}
},
// Handle consensus inputs before backfill or resolver traffic
mailbox_message = self.mailbox.recv() => {
let Some(message) = mailbox_message else {
info!("mailbox closed, shutting down");
break;
};
Some(message) = self.mailbox.recv() else {
info!("mailbox closed, shutting down");
break;
} => {
match message {
Message::GetInfo {
identifier,
Expand Down Expand Up @@ -567,11 +563,10 @@ where
}
},
// Handle resolver messages last
message = resolver_rx.recv() => {
let Some(message) = message else {
info!("handler closed, shutting down");
break;
};
Some(message) = resolver_rx.recv() else {
info!("handler closed, shutting down");
break;
} => {
match message {
handler::Message::Produce { key, response } => {
match key {
Expand Down
11 changes: 4 additions & 7 deletions consensus/src/ordered_broadcast/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,10 @@ impl<
debug!("shutdown");
},
// Handle refresh epoch deadline
epoch = epoch_updates.recv() => {
// Error handling
let Some(epoch) = epoch else {
error!("epoch subscription failed");
break;
};

Some(epoch) = epoch_updates.recv() else {
error!("epoch subscription failed");
break;
} => {
// Refresh the epoch
debug!(current = %self.epoch, new = %epoch, "refresh epoch");
assert!(epoch >= self.epoch);
Expand Down
97 changes: 41 additions & 56 deletions consensus/src/simplex/actors/batcher/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,61 +195,51 @@ impl<
on_stopped => {
debug!("context shutdown, stopping batcher");
},
message = self.mailbox_receiver.recv() => {
match message {
Some(Message::Update {
current: new_current,
leader,
finalized: new_finalized,
active,
}) => {
current = new_current;
finalized = new_finalized;
work.entry(current)
.or_insert_with(|| self.new_round())
.set_leader(leader);

// Check if the leader has been active recently
let skip_timeout = self.skip_timeout.get() as usize;
let is_active =
// Ensure we have enough data to judge activity (none of this
// data may be in the last skip_timeout views if we jumped ahead
// to a new view)
work.len() < skip_timeout
// Leader active in at least one recent round
|| work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader));
active.send_lossy(is_active);

// Setting leader may enable batch verification
updated_view = current;
Some(message) = self.mailbox_receiver.recv() else break => match message {
Message::Update {
current: new_current,
leader,
finalized: new_finalized,
active,
} => {
current = new_current;
finalized = new_finalized;
work.entry(current)
.or_insert_with(|| self.new_round())
.set_leader(leader);

// Check if the leader has been active recently
let skip_timeout = self.skip_timeout.get() as usize;
let is_active =
// Ensure we have enough data to judge activity (none of this
// data may be in the last skip_timeout views if we jumped ahead
// to a new view)
work.len() < skip_timeout
// Leader active in at least one recent round
|| work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader));
active.send_lossy(is_active);

// Setting leader may enable batch verification
updated_view = current;
}
Message::Constructed(message) => {
// If the view isn't interesting, we can skip
let view = message.view();
if !interesting(self.activity_timeout, finalized, current, view, false) {
continue;
}
Some(Message::Constructed(message)) => {
// If the view isn't interesting, we can skip
let view = message.view();
if !interesting(self.activity_timeout, finalized, current, view, false) {
continue;
}

// Add the message to the verifier
work.entry(view)
.or_insert_with(|| self.new_round())
.add_constructed(message)
.await;
self.added.inc();
updated_view = view;
}
None => {
break;
}
// Add the message to the verifier
work.entry(view)
.or_insert_with(|| self.new_round())
.add_constructed(message)
.await;
self.added.inc();
updated_view = view;
}
},
// Handle certificates from the network
message = certificate_receiver.recv() => {
// If the channel is closed, we should exit
let Ok((sender, message)) = message else {
break;
};

Ok((sender, message)) = certificate_receiver.recv() else break => {
// If there is a decoding error, block
let Ok(message) = message else {
warn!(?sender, "blocking peer for decoding error");
Expand Down Expand Up @@ -361,12 +351,7 @@ impl<
continue;
},
// Handle votes from the network
message = vote_receiver.recv() => {
// If the channel is closed, we should exit
let Ok((sender, message)) = message else {
break;
};

Ok((sender, message)) = vote_receiver.recv() else break => {
// If there is a decoding error, block
let Ok(message) = message else {
warn!(?sender, "blocking peer for decoding error");
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/simplex/actors/resolver/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ impl<
_ = &mut resolver_task => {
break;
},
mailbox = self.mailbox_receiver.recv() => {
let Some(message) = mailbox else {
break;
};
Some(message) = self.mailbox_receiver.recv() else break => {
match message {
MailboxMessage::Certificate(certificate) => {
// Certificates from mailbox have no associated request view
Expand All @@ -146,10 +143,7 @@ impl<
}
}
},
handler = handler_rx.recv() => {
let Some(message) = handler else {
break;
};
Some(message) = handler_rx.recv() else break => {
self.handle_resolver(message, &mut voter, &mut resolver)
.await;
},
Expand Down
Loading
Loading