Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 53 additions & 58 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{metrics, Config, Mailbox, Message};
use crate::buffered::metrics::SequencerLabel;
use commonware_codec::Codec;
use commonware_cryptography::{Committable, Digestible, PublicKey};
use commonware_macros::select;
use commonware_macros::select_loop;
use commonware_p2p::{
utils::codec::{wrap, WrappedSender},
Receiver, Recipients, Sender,
Expand Down Expand Up @@ -150,68 +150,63 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
/// Inner run loop called by `start`.
async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
let mut shutdown = self.context.stopped();

loop {
// Cleanup waiters
self.cleanup_waiters();
let _ = self.metrics.waiters.try_set(self.waiters.len());

select! {
// Handle shutdown signal
_ = &mut shutdown => {
debug!("shutdown");
select_loop! {
self.context,
on_start => {
// Cleanup waiters
self.cleanup_waiters();
let _ = self.metrics.waiters.try_set(self.waiters.len());
},
on_stopped => {
debug!("shutdown");
},
// Handle mailbox messages
mail = self.mailbox_receiver.next() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
break;
},

// Handle mailbox messages
mail = self.mailbox_receiver.next() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
};
match msg {
Message::Broadcast{ recipients, message, responder } => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder).await;
}
Message::Subscribe{ peer, commitment, digest, responder } => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder).await;
}
Message::Get{ peer, commitment, digest, responder } => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
}
},
// Handle incoming messages
msg = receiver.recv() => {
// Error handling
let (peer, msg) = match msg {
Ok(r) => r,
Err(err) => {
error!(?err, "receiver failed");
break;
};
match msg {
Message::Broadcast{ recipients, message, responder } => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder).await;
}
Message::Subscribe{ peer, commitment, digest, responder } => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder).await;
}
Message::Get{ peer, commitment, digest, responder } => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
}
},
};

// Decode the message
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
warn!(?err, ?peer, "failed to decode message");
self.metrics.receive.inc(Status::Invalid);
continue;
}
};

// Handle incoming messages
msg = receiver.recv() => {
// Error handling
let (peer, msg) = match msg {
Ok(r) => r,
Err(err) => {
error!(?err, "receiver failed");
break;
}
};

// Decode the message
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
warn!(?err, ?peer, "failed to decode message");
self.metrics.receive.inc(Status::Invalid);
continue;
}
};

trace!(?peer, "network");
self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
self.handle_network(peer, msg).await;
},
}
trace!(?peer, "network");
self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
self.handle_network(peer, msg).await;
},
}
}

Expand Down
Loading
Loading