Skip to content
111 changes: 53 additions & 58 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{metrics, Config, Mailbox, Message};
use crate::buffered::metrics::SequencerLabel;
use commonware_codec::Codec;
use commonware_cryptography::{Committable, Digestible, PublicKey};
use commonware_macros::select;
use commonware_macros::select_loop;
use commonware_p2p::{
utils::codec::{wrap, WrappedSender},
Receiver, Recipients, Sender,
Expand Down Expand Up @@ -149,68 +149,63 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
/// Inner run loop called by `start`.
async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
let mut shutdown = self.context.stopped();

loop {
// Cleanup waiters
self.cleanup_waiters();
let _ = self.metrics.waiters.try_set(self.waiters.len());

select! {
// Handle shutdown signal
_ = &mut shutdown => {
debug!("shutdown");
select_loop! {
self.context,
on_start => {
// Cleanup waiters
self.cleanup_waiters();
let _ = self.metrics.waiters.try_set(self.waiters.len());
},
on_stopped => {
debug!("shutdown");
},
// Handle mailbox messages
mail = self.mailbox_receiver.next() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
break;
},

// Handle mailbox messages
mail = self.mailbox_receiver.next() => {
let Some(msg) = mail else {
error!("mailbox receiver failed");
};
match msg {
Message::Broadcast{ recipients, message, responder } => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder).await;
}
Message::Subscribe{ peer, commitment, digest, responder } => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder).await;
}
Message::Get{ peer, commitment, digest, responder } => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
}
},
// Handle incoming messages
msg = receiver.recv() => {
// Error handling
let (peer, msg) = match msg {
Ok(r) => r,
Err(err) => {
error!(?err, "receiver failed");
break;
};
match msg {
Message::Broadcast{ recipients, message, responder } => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder).await;
}
Message::Subscribe{ peer, commitment, digest, responder } => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder).await;
}
Message::Get{ peer, commitment, digest, responder } => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
}
},
};

// Decode the message
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
warn!(?err, ?peer, "failed to decode message");
self.metrics.receive.inc(Status::Invalid);
continue;
}
};

// Handle incoming messages
msg = receiver.recv() => {
// Error handling
let (peer, msg) = match msg {
Ok(r) => r,
Err(err) => {
error!(?err, "receiver failed");
break;
}
};

// Decode the message
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
warn!(?err, ?peer, "failed to decode message");
self.metrics.receive.inc(Status::Invalid);
continue;
}
};

trace!(?peer, "network");
self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
self.handle_network(peer, msg).await;
},
}
trace!(?peer, "network");
self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
self.handle_network(peer, msg).await;
},
}
}

Expand Down
Loading
Loading