diff --git a/broadcast/src/buffered/engine.rs b/broadcast/src/buffered/engine.rs index f3ab841d1d..a37d877860 100644 --- a/broadcast/src/buffered/engine.rs +++ b/broadcast/src/buffered/engine.rs @@ -2,7 +2,7 @@ use super::{metrics, Config, Mailbox, Message}; use crate::buffered::metrics::SequencerLabel; use commonware_codec::Codec; use commonware_cryptography::{Committable, Digestible, PublicKey}; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Receiver, Recipients, Sender, @@ -150,68 +150,63 @@ impl, impl Receiver)) { let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1); - let mut shutdown = self.context.stopped(); - loop { - // Cleanup waiters - self.cleanup_waiters(); - let _ = self.metrics.waiters.try_set(self.waiters.len()); - - select! { - // Handle shutdown signal - _ = &mut shutdown => { - debug!("shutdown"); + select_loop! { + self.context, + on_start => { + // Cleanup waiters + self.cleanup_waiters(); + let _ = self.metrics.waiters.try_set(self.waiters.len()); + }, + on_stopped => { + debug!("shutdown"); + }, + // Handle mailbox messages + mail = self.mailbox_receiver.next() => { + let Some(msg) = mail else { + error!("mailbox receiver failed"); break; - }, - - // Handle mailbox messages - mail = self.mailbox_receiver.next() => { - let Some(msg) = mail else { - error!("mailbox receiver failed"); + }; + match msg { + Message::Broadcast{ recipients, message, responder } => { + trace!("mailbox: broadcast"); + self.handle_broadcast(&mut sender, recipients, message, responder).await; + } + Message::Subscribe{ peer, commitment, digest, responder } => { + trace!("mailbox: subscribe"); + self.handle_subscribe(peer, commitment, digest, responder).await; + } + Message::Get{ peer, commitment, digest, responder } => { + trace!("mailbox: get"); + self.handle_get(peer, commitment, digest, responder).await; + } + } + }, + // Handle incoming messages + msg = receiver.recv() => { + // Error handling + let (peer, msg) = match msg { + Ok(r) => r, + Err(err) => { + error!(?err, "receiver failed"); break; - }; - match msg { - Message::Broadcast{ recipients, message, responder } => { - trace!("mailbox: broadcast"); - self.handle_broadcast(&mut sender, recipients, message, responder).await; - } - Message::Subscribe{ peer, commitment, digest, responder } => { - trace!("mailbox: subscribe"); - self.handle_subscribe(peer, commitment, digest, responder).await; - } - Message::Get{ peer, commitment, digest, responder } => { - trace!("mailbox: get"); - self.handle_get(peer, commitment, digest, responder).await; - } } - }, + }; + + // Decode the message + let msg = match msg { + Ok(msg) => msg, + Err(err) => { + warn!(?err, ?peer, "failed to decode message"); + self.metrics.receive.inc(Status::Invalid); + continue; + } + }; - // Handle incoming messages - msg = receiver.recv() => { - // Error handling - let (peer, msg) = match msg { - Ok(r) => r, - Err(err) => { - error!(?err, "receiver failed"); - break; - } - }; - - // Decode the message - let msg = match msg { - Ok(msg) => msg, - Err(err) => { - warn!(?err, ?peer, "failed to decode message"); - self.metrics.receive.inc(Status::Invalid); - continue; - } - }; - - trace!(?peer, "network"); - self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc(); - self.handle_network(peer, msg).await; - }, - } + trace!(?peer, "network"); + self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc(); + self.handle_network(peer, msg).await; + }, } } diff --git a/consensus/src/aggregation/engine.rs b/consensus/src/aggregation/engine.rs index 53de6d365d..dcae4df0b5 100644 --- a/consensus/src/aggregation/engine.rs +++ b/consensus/src/aggregation/engine.rs @@ -15,7 +15,7 @@ use commonware_cryptography::{ certificate::{Provider, Scheme}, Digest, }; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Blocker, Receiver, Recipients, Sender, @@ -235,7 +235,6 @@ impl< ), ) { let (mut sender, mut receiver) = wrap((), network.0, network.1); - let mut shutdown = self.context.stopped(); // Initialize the epoch let (latest, mut epoch_updates) = self.monitor.subscribe().await; @@ -270,151 +269,147 @@ impl< .expect("current epoch scheme must exist"); self.safe_tip.init(scheme.participants()); - loop { - let _ = self.metrics.tip.try_set(self.tip.get()); - - // Propose a new digest if we are processing less than the window - let next = self.next(); - - // Underflow safe: next >= self.tip is guaranteed by next() - if next.delta_from(self.tip).unwrap() < self.window { - trace!(%next, "requesting new digest"); - assert!(self - .pending - .insert(next, Pending::Unverified(BTreeMap::new())) - .is_none()); - self.get_digest(next); - continue; - } + select_loop! { + self.context, + on_start => { + let _ = self.metrics.tip.try_set(self.tip.get()); + + // Propose a new digest if we are processing less than the window + let next = self.next(); + + // Underflow safe: next >= self.tip is guaranteed by next() + if next.delta_from(self.tip).unwrap() < self.window { + trace!(%next, "requesting new digest"); + assert!(self + .pending + .insert(next, Pending::Unverified(BTreeMap::new())) + .is_none()); + self.get_digest(next); + continue; + } - // Get the rebroadcast deadline for the next height - let rebroadcast = match self.rebroadcast_deadlines.peek() { - Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)), - None => Either::Right(future::pending()), - }; - - // Process the next event - select! { - // Handle shutdown signal - _ = &mut shutdown => { - debug!("shutdown"); + // Get the rebroadcast deadline for the next height + let rebroadcast = match self.rebroadcast_deadlines.peek() { + Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)), + None => Either::Right(future::pending()), + }; + }, + on_stopped => { + debug!("shutdown"); + }, + // Handle refresh epoch deadline + epoch = epoch_updates.next() => { + // Error handling + let Some(epoch) = epoch else { + error!("epoch subscription failed"); break; - }, - - // Handle refresh epoch deadline - epoch = epoch_updates.next() => { - // Error handling - let Some(epoch) = epoch else { - error!("epoch subscription failed"); - break; - }; - - // Refresh the epoch - debug!(current = %self.epoch, new = %epoch, "refresh epoch"); - assert!(epoch >= self.epoch); - self.epoch = epoch; - - // Update the tip manager - let scheme = self.scheme(self.epoch) - .expect("current epoch scheme must exist"); - self.safe_tip.reconcile(scheme.participants()); - - // Update data structures by purging old epochs - let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0); - self.pending.iter_mut().for_each(|(_, pending)| { - match pending { - Pending::Unverified(acks) => { - acks.retain(|epoch, _| *epoch >= min_epoch); - } - Pending::Verified(_, acks) => { - acks.retain(|epoch, _| *epoch >= min_epoch); - } + }; + + // Refresh the epoch + debug!(current = %self.epoch, new = %epoch, "refresh epoch"); + assert!(epoch >= self.epoch); + self.epoch = epoch; + + // Update the tip manager + let scheme = self.scheme(self.epoch) + .expect("current epoch scheme must exist"); + self.safe_tip.reconcile(scheme.participants()); + + // Update data structures by purging old epochs + let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0); + self.pending.iter_mut().for_each(|(_, pending)| { + match pending { + Pending::Unverified(acks) => { + acks.retain(|epoch, _| *epoch >= min_epoch); } - }); - - continue; - }, - - // Sign a new ack - request = self.digest_requests.next_completed() => { - let DigestRequest { height, result, timer } = request; - drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. - match result { - Err(err) => { - warn!(?err, %height, "automaton returned error"); - self.metrics.digest.inc(Status::Dropped); - } - Ok(digest) => { - if let Err(err) = self.handle_digest(height, digest, &mut sender).await { - debug!(?err, %height, "handle_digest failed"); - continue; - } + Pending::Verified(_, acks) => { + acks.retain(|epoch, _| *epoch >= min_epoch); } } - }, - - // Handle incoming acks - msg = receiver.recv() => { - // Error handling - let (sender, msg) = match msg { - Ok(r) => r, - Err(err) => { - warn!(?err, "ack receiver failed"); - break; - } - }; - let mut guard = self.metrics.acks.guard(Status::Invalid); - let TipAck { ack, tip } = match msg { - Ok(peer_ack) => peer_ack, - Err(err) => { - warn!(?err, ?sender, "ack decode failed, blocking peer"); - self.blocker.block(sender).await; + }); + + continue; + }, + + // Sign a new ack + request = self.digest_requests.next_completed() => { + let DigestRequest { height, result, timer } = request; + drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. + match result { + Err(err) => { + warn!(?err, %height, "automaton returned error"); + self.metrics.digest.inc(Status::Dropped); + } + Ok(digest) => { + if let Err(err) = self.handle_digest(height, digest, &mut sender).await { + debug!(?err, %height, "handle_digest failed"); continue; } - }; - - // Update the tip manager - if self.safe_tip.update(sender.clone(), tip).is_some() { - // Fast-forward our tip if needed - let safe_tip = self.safe_tip.get(); - if safe_tip > self.tip { - self.fast_forward_tip(safe_tip).await; - } } - - // Validate that we need to process the ack - if let Err(err) = self.validate_ack(&ack, &sender) { - if err.blockable() { - warn!(?sender, ?err, "blocking peer for validation failure"); - self.blocker.block(sender).await; - } else { - debug!(?sender, ?err, "ack validate failed"); - } + } + }, + + // Handle incoming acks + msg = receiver.recv() => { + // Error handling + let (sender, msg) = match msg { + Ok(r) => r, + Err(err) => { + warn!(?err, "ack receiver failed"); + break; + } + }; + let mut guard = self.metrics.acks.guard(Status::Invalid); + let TipAck { ack, tip } = match msg { + Ok(peer_ack) => peer_ack, + Err(err) => { + warn!(?err, ?sender, "ack decode failed, blocking peer"); + self.blocker.block(sender).await; continue; - }; + } + }; + + // Update the tip manager + if self.safe_tip.update(sender.clone(), tip).is_some() { + // Fast-forward our tip if needed + let safe_tip = self.safe_tip.get(); + if safe_tip > self.tip { + self.fast_forward_tip(safe_tip).await; + } + } - // Handle the ack - if let Err(err) = self.handle_ack(&ack).await { - debug!(?err, ?sender, "ack handle failed"); - guard.set(Status::Failure); - continue; + // Validate that we need to process the ack + if let Err(err) = self.validate_ack(&ack, &sender) { + if err.blockable() { + warn!(?sender, ?err, "blocking peer for validation failure"); + self.blocker.block(sender).await; + } else { + debug!(?sender, ?err, "ack validate failed"); } + continue; + }; - // Update the metrics - debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack"); - guard.set(Status::Success); - }, - - // Rebroadcast - _ = rebroadcast => { - // Get the next height to rebroadcast - let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline"); - trace!(%height, "rebroadcasting"); - if let Err(err) = self.handle_rebroadcast(height, &mut sender).await { - warn!(?err, %height, "rebroadcast failed"); - }; + // Handle the ack + if let Err(err) = self.handle_ack(&ack).await { + debug!(?err, ?sender, "ack handle failed"); + guard.set(Status::Failure); + continue; } - } + + // Update the metrics + debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack"); + guard.set(Status::Success); + }, + + // Rebroadcast + _ = rebroadcast => { + // Get the next height to rebroadcast + let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline"); + trace!(%height, "rebroadcasting"); + if let Err(err) = self.handle_rebroadcast(height, &mut sender).await { + warn!(?err, %height, "rebroadcast failed"); + }; + }, } // Close journal on shutdown diff --git a/consensus/src/marshal/actor.rs b/consensus/src/marshal/actor.rs index 7eb4263ff5..a42aea87bc 100644 --- a/consensus/src/marshal/actor.rs +++ b/consensus/src/marshal/actor.rs @@ -25,7 +25,7 @@ use commonware_cryptography::{ certificate::{Provider, Scheme as CertificateScheme}, PublicKey, }; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::Recipients; use commonware_parallel::Strategy; use commonware_resolver::Resolver; @@ -306,437 +306,439 @@ where self.try_repair_gaps(&mut buffer, &mut resolver, &mut application) .await; - loop { - // Remove any dropped subscribers. If all subscribers dropped, abort the waiter. - self.block_subscriptions.retain(|_, bs| { - bs.subscribers.retain(|tx| !tx.is_canceled()); - !bs.subscribers.is_empty() - }); - - // Select messages - select! { - // Handle waiter completions first - result = waiters.next_completed() => { - let Ok((commitment, block)) = result else { - continue; // Aborted future - }; - self.notify_subscribers(commitment, &block).await; - }, - // Handle application acknowledgements next - ack = &mut self.pending_ack => { - let PendingAck { height, commitment, .. } = self.pending_ack.take().expect("ack state must be present"); - - match ack { - Ok(()) => { - if let Err(e) = self - .handle_block_processed(height, commitment, &mut resolver) - .await - { - error!(?e, %height, "failed to update application progress"); - return; - } - self.try_dispatch_block(&mut application).await; - } - Err(e) => { - error!(?e, %height, "application did not acknowledge block"); + select_loop! { + self.context, + on_start => { + // Remove any dropped subscribers. If all subscribers dropped, abort the waiter. + self.block_subscriptions.retain(|_, bs| { + bs.subscribers.retain(|tx| !tx.is_canceled()); + !bs.subscribers.is_empty() + }); + }, + on_stopped => { + debug!("context shutdown, stopping marshal"); + }, + // Handle waiter completions first + result = waiters.next_completed() => { + let Ok((commitment, block)) = result else { + continue; // Aborted future + }; + self.notify_subscribers(commitment, &block).await; + }, + // Handle application acknowledgements next + ack = &mut self.pending_ack => { + let PendingAck { height, commitment, .. } = self.pending_ack.take().expect("ack state must be present"); + + match ack { + Ok(()) => { + if let Err(e) = self + .handle_block_processed(height, commitment, &mut resolver) + .await + { + error!(?e, %height, "failed to update application progress"); return; } + self.try_dispatch_block(&mut application).await; } - }, - // Handle consensus inputs before backfill or resolver traffic - mailbox_message = self.mailbox.next() => { - let Some(message) = mailbox_message else { - info!("mailbox closed, shutting down"); + Err(e) => { + error!(?e, %height, "application did not acknowledge block"); return; - }; - match message { - Message::GetInfo { identifier, response } => { - let info = match identifier { - // TODO: Instead of pulling out the entire block, determine the - // height directly from the archive by mapping the commitment to - // the index, which is the same as the height. - BlockID::Commitment(commitment) => self - .finalized_blocks - .get(ArchiveID::Key(&commitment)) - .await - .ok() - .flatten() - .map(|b| (b.height(), commitment)), - BlockID::Height(height) => self - .finalizations_by_height - .get(ArchiveID::Index(height.get())) - .await - .ok() - .flatten() - .map(|f| (height, f.proposal.payload)), - BlockID::Latest => self - .get_latest() + } + } + }, + // Handle consensus inputs before backfill or resolver traffic + mailbox_message = self.mailbox.next() => { + let Some(message) = mailbox_message else { + info!("mailbox closed, shutting down"); + break; + }; + match message { + Message::GetInfo { identifier, response } => { + let info = match identifier { + // TODO: Instead of pulling out the entire block, determine the + // height directly from the archive by mapping the commitment to + // the index, which is the same as the height. + BlockID::Commitment(commitment) => self + .finalized_blocks + .get(ArchiveID::Key(&commitment)) .await - .map(|(h, c, _)| (h, c)), - }; - response.send_lossy(info); - } - Message::Proposed { round, block } => { - self.cache_verified(round, block.commitment(), block.clone()).await; - let _peers = buffer.broadcast(Recipients::All, block).await; + .ok() + .flatten() + .map(|b| (b.height(), commitment)), + BlockID::Height(height) => self + .finalizations_by_height + .get(ArchiveID::Index(height.get())) + .await + .ok() + .flatten() + .map(|f| (height, f.proposal.payload)), + BlockID::Latest => self + .get_latest() + .await + .map(|(h, c, _)| (h, c)), + }; + response.send_lossy(info); + } + Message::Proposed { round, block } => { + self.cache_verified(round, block.commitment(), block.clone()).await; + let _peers = buffer.broadcast(Recipients::All, block).await; + } + Message::Verified { round, block } => { + self.cache_verified(round, block.commitment(), block).await; + } + Message::Notarization { notarization } => { + let round = notarization.round(); + let commitment = notarization.proposal.payload; + + // Store notarization by view + self.cache.put_notarization(round, commitment, notarization.clone()).await; + + // Search for block locally, otherwise fetch it remotely + if let Some(block) = self.find_block(&mut buffer, commitment).await { + // If found, persist the block + self.cache_block(round, commitment, block).await; + } else { + debug!(?round, "notarized block missing"); + resolver.fetch(Request::::Notarized { round }).await; } - Message::Verified { round, block } => { - self.cache_verified(round, block.commitment(), block).await; + } + Message::Finalization { finalization } => { + // Cache finalization by round + let round = finalization.round(); + let commitment = finalization.proposal.payload; + self.cache.put_finalization(round, commitment, finalization.clone()).await; + + // Search for block locally, otherwise fetch it remotely + if let Some(block) = self.find_block(&mut buffer, commitment).await { + // If found, persist the block + let height = block.height(); + self.finalize( + height, + commitment, + block, + Some(finalization), + &mut application, + &mut buffer, + &mut resolver, + ) + .await; + debug!(?round, %height, "finalized block stored"); + } else { + // Otherwise, fetch the block from the network. + debug!(?round, ?commitment, "finalized block missing"); + resolver.fetch(Request::::Block(commitment)).await; } - Message::Notarization { notarization } => { - let round = notarization.round(); - let commitment = notarization.proposal.payload; - - // Store notarization by view - self.cache.put_notarization(round, commitment, notarization.clone()).await; - - // Search for block locally, otherwise fetch it remotely - if let Some(block) = self.find_block(&mut buffer, commitment).await { - // If found, persist the block - self.cache_block(round, commitment, block).await; - } else { - debug!(?round, "notarized block missing"); - resolver.fetch(Request::::Notarized { round }).await; + } + Message::GetBlock { identifier, response } => { + match identifier { + BlockID::Commitment(commitment) => { + let result = self.find_block(&mut buffer, commitment).await; + response.send_lossy(result); } - } - Message::Finalization { finalization } => { - // Cache finalization by round - let round = finalization.round(); - let commitment = finalization.proposal.payload; - self.cache.put_finalization(round, commitment, finalization.clone()).await; - - // Search for block locally, otherwise fetch it remotely - if let Some(block) = self.find_block(&mut buffer, commitment).await { - // If found, persist the block - let height = block.height(); - self.finalize( - height, - commitment, - block, - Some(finalization), - &mut application, - &mut buffer, - &mut resolver, - ) - .await; - debug!(?round, %height, "finalized block stored"); - } else { - // Otherwise, fetch the block from the network. - debug!(?round, ?commitment, "finalized block missing"); - resolver.fetch(Request::::Block(commitment)).await; + BlockID::Height(height) => { + let result = self.get_finalized_block(height).await; + response.send_lossy(result); } - } - Message::GetBlock { identifier, response } => { - match identifier { - BlockID::Commitment(commitment) => { - let result = self.find_block(&mut buffer, commitment).await; - response.send_lossy(result); - } - BlockID::Height(height) => { - let result = self.get_finalized_block(height).await; - response.send_lossy(result); - } - BlockID::Latest => { - let block = match self.get_latest().await { - Some((_, commitment, _)) => self.find_block(&mut buffer, commitment).await, - None => None, - }; - response.send_lossy(block); - } + BlockID::Latest => { + let block = match self.get_latest().await { + Some((_, commitment, _)) => self.find_block(&mut buffer, commitment).await, + None => None, + }; + response.send_lossy(block); } } - Message::GetFinalization { height, response } => { - let finalization = self.get_finalization_by_height(height).await; - response.send_lossy(finalization); + } + Message::GetFinalization { height, response } => { + let finalization = self.get_finalization_by_height(height).await; + response.send_lossy(finalization); + } + Message::HintFinalized { height, targets } => { + // Skip if height is at or below the floor + if height <= self.last_processed_height { + continue; } - Message::HintFinalized { height, targets } => { - // Skip if height is at or below the floor - if height <= self.last_processed_height { - continue; - } - // Skip if finalization is already available locally - if self.get_finalization_by_height(height).await.is_some() { - continue; - } + // Skip if finalization is already available locally + if self.get_finalization_by_height(height).await.is_some() { + continue; + } - // Trigger a targeted fetch via the resolver - let request = Request::::Finalized { height }; - resolver.fetch_targeted(request, targets).await; + // Trigger a targeted fetch via the resolver + let request = Request::::Finalized { height }; + resolver.fetch_targeted(request, targets).await; + } + Message::Subscribe { round, commitment, response } => { + // Check for block locally + if let Some(block) = self.find_block(&mut buffer, commitment).await { + response.send_lossy(block); + continue; } - Message::Subscribe { round, commitment, response } => { - // Check for block locally - if let Some(block) = self.find_block(&mut buffer, commitment).await { - response.send_lossy(block); + + // We don't have the block locally, so fetch the block from the network + // if we have an associated view. If we only have the digest, don't make + // the request as we wouldn't know when to drop it, and the request may + // never complete if the block is not finalized. + if let Some(round) = round { + if round < self.last_processed_round { + // At this point, we have failed to find the block locally, and + // we know that its round is less than the last processed round. + // This means that something else was finalized in that round, + // so we drop the response to indicate that the block may never + // be available. continue; } + // Attempt to fetch the block (with notarization) from the resolver. + // If this is a valid view, this request should be fine to keep open + // until resolution or pruning (even if the oneshot is canceled). + debug!(?round, ?commitment, "requested block missing"); + resolver.fetch(Request::::Notarized { round }).await; + } - // We don't have the block locally, so fetch the block from the network - // if we have an associated view. If we only have the digest, don't make - // the request as we wouldn't know when to drop it, and the request may - // never complete if the block is not finalized. - if let Some(round) = round { - if round < self.last_processed_round { - // At this point, we have failed to find the block locally, and - // we know that its round is less than the last processed round. - // This means that something else was finalized in that round, - // so we drop the response to indicate that the block may never - // be available. - continue; - } - // Attempt to fetch the block (with notarization) from the resolver. - // If this is a valid view, this request should be fine to keep open - // until resolution or pruning (even if the oneshot is canceled). - debug!(?round, ?commitment, "requested block missing"); - resolver.fetch(Request::::Notarized { round }).await; + // Register subscriber + debug!(?round, ?commitment, "registering subscriber"); + match self.block_subscriptions.entry(commitment) { + Entry::Occupied(mut entry) => { + entry.get_mut().subscribers.push(response); } - - // Register subscriber - debug!(?round, ?commitment, "registering subscriber"); - match self.block_subscriptions.entry(commitment) { - Entry::Occupied(mut entry) => { - entry.get_mut().subscribers.push(response); - } - Entry::Vacant(entry) => { - let (tx, rx) = oneshot::channel(); - buffer.subscribe_prepared(None, commitment, None, tx).await; - let aborter = waiters.push(async move { - (commitment, rx.await.expect("buffer subscriber closed")) - }); - entry.insert(BlockSubscription { - subscribers: vec![response], - _aborter: aborter, - }); - } + Entry::Vacant(entry) => { + let (tx, rx) = oneshot::channel(); + buffer.subscribe_prepared(None, commitment, None, tx).await; + let aborter = waiters.push(async move { + (commitment, rx.await.expect("buffer subscriber closed")) + }); + entry.insert(BlockSubscription { + subscribers: vec![response], + _aborter: aborter, + }); } } - Message::SetFloor { height } => { - if self.last_processed_height >= height { - warn!( - %height, - existing = %self.last_processed_height, - "floor not updated, lower than existing" - ); - continue; - } + } + Message::SetFloor { height } => { + if self.last_processed_height >= height { + warn!( + %height, + existing = %self.last_processed_height, + "floor not updated, lower than existing" + ); + continue; + } - // Update the processed height - if let Err(err) = self.set_processed_height(height, &mut resolver).await { - error!(?err, %height, "failed to update floor"); - return; - } + // Update the processed height + if let Err(err) = self.set_processed_height(height, &mut resolver).await { + error!(?err, %height, "failed to update floor"); + return; + } - // Drop the pending acknowledgement, if one exists. We must do this to prevent - // an in-process block from being processed that is below the new floor - // updating `last_processed_height`. - self.pending_ack = None.into(); + // Drop the pending acknowledgement, if one exists. We must do this to prevent + // an in-process block from being processed that is below the new floor + // updating `last_processed_height`. + self.pending_ack = None.into(); - // Prune the finalized block and finalization certificate archives in parallel. - if let Err(err) = self.prune_finalized_archives(height).await { - error!(?err, %height, "failed to prune finalized archives"); - return; - } + // Prune the finalized block and finalization certificate archives in parallel. + if let Err(err) = self.prune_finalized_archives(height).await { + error!(?err, %height, "failed to prune finalized archives"); + return; } - Message::Prune { height } => { - // Only allow pruning at or below the current floor - if height > self.last_processed_height { - warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring"); - continue; + } + Message::Prune { height } => { + // Only allow pruning at or below the current floor + if height > self.last_processed_height { + warn!(%height, floor = %self.last_processed_height, "prune height above floor, ignoring"); + continue; + } + + // Prune the finalized block and finalization certificate archives in parallel. + if let Err(err) = self.prune_finalized_archives(height).await { + error!(?err, %height, "failed to prune finalized archives"); + return; + } + } + } + }, + // Handle resolver messages last + message = resolver_rx.next() => { + let Some(message) = message else { + info!("handler closed, shutting down"); + break; + }; + match message { + handler::Message::Produce { key, response } => { + match key { + Request::Block(commitment) => { + // Check for block locally + let Some(block) = self.find_block(&mut buffer, commitment).await else { + debug!(?commitment, "block missing on request"); + continue; + }; + response.send_lossy(block.encode()); } + Request::Finalized { height } => { + // Get finalization + let Some(finalization) = self.get_finalization_by_height(height).await else { + debug!(%height, "finalization missing on request"); + continue; + }; - // Prune the finalized block and finalization certificate archives in parallel. - if let Err(err) = self.prune_finalized_archives(height).await { - error!(?err, %height, "failed to prune finalized archives"); - return; + // Get block + let Some(block) = self.get_finalized_block(height).await else { + debug!(%height, "finalized block missing on request"); + continue; + }; + + // Send finalization + response.send_lossy((finalization, block).encode()); + } + Request::Notarized { round } => { + // Get notarization + let Some(notarization) = self.cache.get_notarization(round).await else { + debug!(?round, "notarization missing on request"); + continue; + }; + + // Get block + let commitment = notarization.proposal.payload; + let Some(block) = self.find_block(&mut buffer, commitment).await else { + debug!(?commitment, "block missing on request"); + continue; + }; + response.send_lossy((notarization, block).encode()); } } - } - }, - // Handle resolver messages last - message = resolver_rx.next() => { - let Some(message) = message else { - info!("handler closed, shutting down"); - return; - }; - match message { - handler::Message::Produce { key, response } => { - match key { - Request::Block(commitment) => { - // Check for block locally - let Some(block) = self.find_block(&mut buffer, commitment).await else { - debug!(?commitment, "block missing on request"); - continue; - }; - response.send_lossy(block.encode()); + }, + handler::Message::Deliver { key, value, response } => { + match key { + Request::Block(commitment) => { + // Parse block + let Ok(block) = B::decode_cfg(value.as_ref(), &self.block_codec_config) else { + response.send_lossy(false); + continue; + }; + + // Validation + if block.commitment() != commitment { + response.send_lossy(false); + continue; } - Request::Finalized { height } => { - // Get finalization - let Some(finalization) = self.get_finalization_by_height(height).await else { - debug!(%height, "finalization missing on request"); - continue; - }; - - // Get block - let Some(block) = self.get_finalized_block(height).await else { - debug!(%height, "finalized block missing on request"); - continue; - }; - - // Send finalization - response.send_lossy((finalization, block).encode()); + + // Persist the block, also persisting the finalization if we have it + let height = block.height(); + let finalization = self.cache.get_finalization_for(commitment).await; + self.finalize( + height, + commitment, + block, + finalization, + &mut application, + &mut buffer, + &mut resolver, + ) + .await; + debug!(?commitment, %height, "received block"); + response.send_lossy(true); + }, + Request::Finalized { height } => { + let Some(bounds) = self.epocher.containing(height) else { + response.send_lossy(false); + continue; + }; + let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else { + response.send_lossy(false); + continue; + }; + + // Parse finalization + let Ok((finalization, block)) = + <(Finalization, B)>::decode_cfg( + value, + &(scheme.certificate_codec_config(), self.block_codec_config.clone()), + ) + else { + response.send_lossy(false); + continue; + }; + + // Validation + if block.height() != height + || finalization.proposal.payload != block.commitment() + || !finalization.verify(&mut self.context, &scheme, &self.strategy) + { + response.send_lossy(false); + continue; } - Request::Notarized { round } => { - // Get notarization - let Some(notarization) = self.cache.get_notarization(round).await else { - debug!(?round, "notarization missing on request"); - continue; - }; - - // Get block - let commitment = notarization.proposal.payload; - let Some(block) = self.find_block(&mut buffer, commitment).await else { - debug!(?commitment, "block missing on request"); - continue; - }; - response.send_lossy((notarization, block).encode()); + + // Valid finalization received + debug!(%height, "received finalization"); + response.send_lossy(true); + self.finalize( + height, + block.commitment(), + block, + Some(finalization), + &mut application, + &mut buffer, + &mut resolver, + ) + .await; + }, + Request::Notarized { round } => { + let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else { + response.send_lossy(false); + continue; + }; + + // Parse notarization + let Ok((notarization, block)) = + <(Notarization, B)>::decode_cfg( + value, + &(scheme.certificate_codec_config(), self.block_codec_config.clone()), + ) + else { + response.send_lossy(false); + continue; + }; + + // Validation + if notarization.round() != round + || notarization.proposal.payload != block.commitment() + || !notarization.verify(&mut self.context, &scheme, &self.strategy) + { + response.send_lossy(false); + continue; } - } - }, - handler::Message::Deliver { key, value, response } => { - match key { - Request::Block(commitment) => { - // Parse block - let Ok(block) = B::decode_cfg(value.as_ref(), &self.block_codec_config) else { - response.send_lossy(false); - continue; - }; - - // Validation - if block.commitment() != commitment { - response.send_lossy(false); - continue; - } - - // Persist the block, also persisting the finalization if we have it - let height = block.height(); - let finalization = self.cache.get_finalization_for(commitment).await; + + // Valid notarization received + response.send_lossy(true); + let commitment = block.commitment(); + debug!(?round, ?commitment, "received notarization"); + + // If there exists a finalization certificate for this block, we + // should finalize it. While not necessary, this could finalize + // the block faster in the case where a notarization then a + // finalization is received via the consensus engine and we + // resolve the request for the notarization before we resolve + // the request for the block. + let height = block.height(); + if let Some(finalization) = self.cache.get_finalization_for(commitment).await { self.finalize( height, commitment, - block, - finalization, - &mut application, - &mut buffer, - &mut resolver, - ) - .await; - debug!(?commitment, %height, "received block"); - response.send_lossy(true); - }, - Request::Finalized { height } => { - let Some(bounds) = self.epocher.containing(height) else { - response.send_lossy(false); - continue; - }; - let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else { - response.send_lossy(false); - continue; - }; - - // Parse finalization - let Ok((finalization, block)) = - <(Finalization, B)>::decode_cfg( - value, - &(scheme.certificate_codec_config(), self.block_codec_config.clone()), - ) - else { - response.send_lossy(false); - continue; - }; - - // Validation - if block.height() != height - || finalization.proposal.payload != block.commitment() - || !finalization.verify(&mut self.context, &scheme, &self.strategy) - { - response.send_lossy(false); - continue; - } - - // Valid finalization received - debug!(%height, "received finalization"); - response.send_lossy(true); - self.finalize( - height, - block.commitment(), - block, + block.clone(), Some(finalization), &mut application, &mut buffer, &mut resolver, ) .await; - }, - Request::Notarized { round } => { - let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else { - response.send_lossy(false); - continue; - }; - - // Parse notarization - let Ok((notarization, block)) = - <(Notarization, B)>::decode_cfg( - value, - &(scheme.certificate_codec_config(), self.block_codec_config.clone()), - ) - else { - response.send_lossy(false); - continue; - }; - - // Validation - if notarization.round() != round - || notarization.proposal.payload != block.commitment() - || !notarization.verify(&mut self.context, &scheme, &self.strategy) - { - response.send_lossy(false); - continue; - } - - // Valid notarization received - response.send_lossy(true); - let commitment = block.commitment(); - debug!(?round, ?commitment, "received notarization"); - - // If there exists a finalization certificate for this block, we - // should finalize it. While not necessary, this could finalize - // the block faster in the case where a notarization then a - // finalization is received via the consensus engine and we - // resolve the request for the notarization before we resolve - // the request for the block. - let height = block.height(); - if let Some(finalization) = self.cache.get_finalization_for(commitment).await { - self.finalize( - height, - commitment, - block.clone(), - Some(finalization), - &mut application, - &mut buffer, - &mut resolver, - ) - .await; - } - - // Cache the notarization and block - self.cache_block(round, commitment, block).await; - self.cache.put_notarization(round, commitment, notarization).await; - }, - } - }, - } - }, - } + } + + // Cache the notarization and block + self.cache_block(round, commitment, block).await; + self.cache.put_notarization(round, commitment, notarization).await; + }, + } + }, + } + }, } } diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index cf546c8343..495050f38f 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -24,7 +24,7 @@ use commonware_cryptography::{ certificate::{Provider, Scheme}, Digest, PublicKey, Signer, }; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Receiver, Recipients, Sender, @@ -291,7 +291,6 @@ impl< let mut node_sender = chunk_network.0; let mut node_receiver = chunk_network.1; let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1); - let mut shutdown = self.context.stopped(); // Tracks if there is an outstanding proposal request to the automaton. let mut pending: Option<(Context, oneshot::Receiver)> = None; @@ -310,180 +309,176 @@ impl< } } - loop { - // Request a new proposal if necessary - if pending.is_none() { - if let Some(context) = self.should_propose() { - let receiver = self.automaton.propose(context.clone()).await; - pending = Some((context, receiver)); + select_loop! { + self.context, + on_start => { + // Request a new proposal if necessary + if pending.is_none() { + if let Some(context) = self.should_propose() { + let receiver = self.automaton.propose(context.clone()).await; + pending = Some((context, receiver)); + } } - } - - // Create deadline futures. - // - // If the deadline is None, the future will never resolve. - let rebroadcast = match self.rebroadcast_deadline { - Some(deadline) => Either::Left(self.context.sleep_until(deadline)), - None => Either::Right(future::pending()), - }; - let propose = match &mut pending { - Some((_context, receiver)) => Either::Left(receiver), - None => Either::Right(futures::future::pending()), - }; - // Process the next event - select! { - // Handle shutdown signal - _ = &mut shutdown => { - debug!("shutdown"); + // Create deadline futures. + // + // If the deadline is None, the future will never resolve. + let rebroadcast = match self.rebroadcast_deadline { + Some(deadline) => Either::Left(self.context.sleep_until(deadline)), + None => Either::Right(future::pending()), + }; + let propose = match &mut pending { + Some((_context, receiver)) => Either::Left(receiver), + None => Either::Right(futures::future::pending()), + }; + }, + on_stopped => { + debug!("shutdown"); + }, + // Handle refresh epoch deadline + epoch = epoch_updates.next() => { + // Error handling + let Some(epoch) = epoch else { + error!("epoch subscription failed"); break; - }, + }; + + // Refresh the epoch + debug!(current = %self.epoch, new = %epoch, "refresh epoch"); + assert!(epoch >= self.epoch); + self.epoch = epoch; + continue; + }, + + // Handle rebroadcast deadline + _ = rebroadcast => { + if let Some(ref signer) = self.sequencer_signer { + debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast"); + if let Err(err) = self.rebroadcast(&mut node_sender).await { + info!(?err, "rebroadcast failed"); + continue; + } + } + }, - // Handle refresh epoch deadline - epoch = epoch_updates.next() => { - // Error handling - let Some(epoch) = epoch else { - error!("epoch subscription failed"); - break; - }; + // Propose a new chunk + receiver = propose => { + // Clear the pending proposal + let (context, _) = pending.take().unwrap(); + debug!(height = %context.height, "propose"); - // Refresh the epoch - debug!(current = %self.epoch, new = %epoch, "refresh epoch"); - assert!(epoch >= self.epoch); - self.epoch = epoch; + // Error handling for dropped proposals + let Ok(payload) = receiver else { + warn!(?context, "automaton dropped proposal"); continue; - }, - - // Handle rebroadcast deadline - _ = rebroadcast => { - if let Some(ref signer) = self.sequencer_signer { - debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast"); - if let Err(err) = self.rebroadcast(&mut node_sender).await { - info!(?err, "rebroadcast failed"); - continue; - } - } - }, - - // Propose a new chunk - receiver = propose => { - // Clear the pending proposal - let (context, _) = pending.take().unwrap(); - debug!(height = %context.height, "propose"); + }; - // Error handling for dropped proposals - let Ok(payload) = receiver else { - warn!(?context, "automaton dropped proposal"); + // Propose the chunk + if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await { + warn!(?err, ?context, "propose new failed"); + continue; + } + }, + + // Handle incoming nodes + msg = node_receiver.recv() => { + // Error handling + let (sender, msg) = match msg { + Ok(r) => r, + Err(err) => { + error!(?err, "node receiver failed"); + break; + } + }; + let mut guard = self.metrics.nodes.guard(Status::Invalid); + + // Decode using staged decoding with epoch-aware certificate bounds + let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) { + Ok(node) => node, + Err(err) => { + debug!(?err, ?sender, "node decode failed"); continue; - }; - - // Propose the chunk - if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await { - warn!(?err, ?context, "propose new failed"); + } + }; + let result = match self.validate_node(&node, &sender) { + Ok(result) => result, + Err(err) => { + debug!(?err, ?sender, "node validate failed"); continue; } - }, - - // Handle incoming nodes - msg = node_receiver.recv() => { - // Error handling - let (sender, msg) = match msg { - Ok(r) => r, - Err(err) => { - error!(?err, "node receiver failed"); - break; - } - }; - let mut guard = self.metrics.nodes.guard(Status::Invalid); - - // Decode using staged decoding with epoch-aware certificate bounds - let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) { - Ok(node) => node, - Err(err) => { - debug!(?err, ?sender, "node decode failed"); - continue; - } - }; - let result = match self.validate_node(&node, &sender) { - Ok(result) => result, - Err(err) => { - debug!(?err, ?sender, "node validate failed"); - continue; - } - }; + }; - // Initialize journal for sequencer if it does not exist - self.journal_prepare(&sender).await; + // Initialize journal for sequencer if it does not exist + self.journal_prepare(&sender).await; - // Handle the parent certificate - if let Some(parent_chunk) = result { - let parent = node.parent.as_ref().unwrap(); - self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await; - } + // Handle the parent certificate + if let Some(parent_chunk) = result { + let parent = node.parent.as_ref().unwrap(); + self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await; + } - // Process the node - // - // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote - // on it again (our original vote may have been lost). - self.handle_node(&node).await; - debug!(?sender, height = %node.chunk.height, "node"); - guard.set(Status::Success); - }, - - // Handle incoming acks - msg = ack_receiver.recv() => { - // Error handling - let (sender, msg) = match msg { - Ok(r) => r, - Err(err) => { - warn!(?err, "ack receiver failed"); - break; - } - }; - let mut guard = self.metrics.acks.guard(Status::Invalid); - let ack = match msg { - Ok(ack) => ack, - Err(err) => { - debug!(?err, ?sender, "ack decode failed"); - continue; - } - }; - if let Err(err) = self.validate_ack(&ack, &sender) { - debug!(?err, ?sender, "ack validate failed"); - continue; - }; - if let Err(err) = self.handle_ack(&ack).await { - debug!(?err, ?sender, "ack handle failed"); - guard.set(Status::Failure); + // Process the node + // + // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote + // on it again (our original vote may have been lost). + self.handle_node(&node).await; + debug!(?sender, height = %node.chunk.height, "node"); + guard.set(Status::Success); + }, + + // Handle incoming acks + msg = ack_receiver.recv() => { + // Error handling + let (sender, msg) = match msg { + Ok(r) => r, + Err(err) => { + warn!(?err, "ack receiver failed"); + break; + } + }; + let mut guard = self.metrics.acks.guard(Status::Invalid); + let ack = match msg { + Ok(ack) => ack, + Err(err) => { + debug!(?err, ?sender, "ack decode failed"); continue; } - debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack"); - guard.set(Status::Success); - }, - - // Handle completed verification futures. - verify = self.pending_verifies.next_completed() => { - let Verify { timer, context, payload, result } = verify; - drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. - match result { - Err(err) => { - warn!(?err, ?context, "verified returned error"); - self.metrics.verify.inc(Status::Dropped); - } - Ok(false) => { - debug!(?context, "verified was false"); - self.metrics.verify.inc(Status::Failure); - } - Ok(true) => { - debug!(?context, "verified"); - self.metrics.verify.inc(Status::Success); - if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await { - debug!(?err, ?context, ?payload, "verified handle failed"); - } - }, + }; + if let Err(err) = self.validate_ack(&ack, &sender) { + debug!(?err, ?sender, "ack validate failed"); + continue; + }; + if let Err(err) = self.handle_ack(&ack).await { + debug!(?err, ?sender, "ack handle failed"); + guard.set(Status::Failure); + continue; + } + debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack"); + guard.set(Status::Success); + }, + + // Handle completed verification futures. + verify = self.pending_verifies.next_completed() => { + let Verify { timer, context, payload, result } = verify; + drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. + match result { + Err(err) => { + warn!(?err, ?context, "verified returned error"); + self.metrics.verify.inc(Status::Dropped); } - }, - } + Ok(false) => { + debug!(?context, "verified was false"); + self.metrics.verify.inc(Status::Failure); + } + Ok(true) => { + debug!(?context, "verified"); + self.metrics.verify.inc(Status::Success); + if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await { + debug!(?err, ?context, ?payload, "verified handle failed"); + } + }, + } + }, } // Sync and drop all journals, regardless of how we exit the loop diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index c45405bd7c..d6bcb5bbf8 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -11,7 +11,7 @@ use crate::{ Epochable, Reporter, Viewable, }; use commonware_cryptography::Digest; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver}; use commonware_parallel::Strategy; use commonware_runtime::{ @@ -187,365 +187,364 @@ impl< let mut current = View::zero(); let mut finalized = View::zero(); let mut work = BTreeMap::new(); - let mut shutdown = self.context.stopped(); - loop { - // Track which view was modified (if any) for certificate construction - let updated_view; - - // Handle next message - select! { - _ = &mut shutdown => { - debug!("context shutdown, stopping batcher"); - break; - }, - message = self.mailbox_receiver.next() => { - match message { - Some(Message::Update { - current: new_current, - leader, - finalized: new_finalized, - active, - }) => { - current = new_current; - finalized = new_finalized; - work - .entry(current) - .or_insert_with(|| self.new_round()) - .set_leader(leader); - - // Check if the leader has been active recently - let skip_timeout = self.skip_timeout.get() as usize; - let is_active = - // Ensure we have enough data to judge activity (none of this - // data may be in the last skip_timeout views if we jumped ahead - // to a new view) - work.len() < skip_timeout - // Leader active in at least one recent round - || work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader)); - active.send_lossy(is_active); - - // Setting leader may enable batch verification - updated_view = current; - } - Some(Message::Constructed(message)) => { - // If the view isn't interesting, we can skip - let view = message.view(); - if !interesting( - self.activity_timeout, - finalized, - current, - view, - false, - ) { - continue; - } - - // Add the message to the verifier - work.entry(view) - .or_insert_with(|| self.new_round()) - .add_constructed(message) - .await; - self.added.inc(); - updated_view = view; - } - None => { - break; + select_loop! { + self.context, + on_start => { + // Track which view was modified (if any) for certificate construction + let updated_view; + }, + on_stopped => { + debug!("context shutdown, stopping batcher"); + }, + message = self.mailbox_receiver.next() => { + match message { + Some(Message::Update { + current: new_current, + leader, + finalized: new_finalized, + active, + }) => { + current = new_current; + finalized = new_finalized; + work + .entry(current) + .or_insert_with(|| self.new_round()) + .set_leader(leader); + + // Check if the leader has been active recently + let skip_timeout = self.skip_timeout.get() as usize; + let is_active = + // Ensure we have enough data to judge activity (none of this + // data may be in the last skip_timeout views if we jumped ahead + // to a new view) + work.len() < skip_timeout + // Leader active in at least one recent round + || work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader)); + active.send_lossy(is_active); + + // Setting leader may enable batch verification + updated_view = current; + } + Some(Message::Constructed(message)) => { + // If the view isn't interesting, we can skip + let view = message.view(); + if !interesting( + self.activity_timeout, + finalized, + current, + view, + false, + ) { + continue; } + + // Add the message to the verifier + work.entry(view) + .or_insert_with(|| self.new_round()) + .add_constructed(message) + .await; + self.added.inc(); + updated_view = view; } - }, - // Handle certificates from the network - message = certificate_receiver.recv() => { - // If the channel is closed, we should exit - let Ok((sender, message)) = message else { + None => { break; - }; - - // If there is a decoding error, block - let Ok(message) = message else { - warn!(?sender, "blocking peer for decoding error"); - self.blocker.block(sender).await; - continue; - }; - - // Update metrics - let label = match &message { - Certificate::Notarization(_) => Inbound::notarization(&sender), - Certificate::Nullification(_) => Inbound::nullification(&sender), - Certificate::Finalization(_) => Inbound::finalization(&sender), - }; - self.inbound_messages.get_or_create(&label).inc(); - - // If the epoch is not the current epoch, block - if message.epoch() != self.epoch { - warn!(?sender, "blocking peer for epoch mismatch"); - self.blocker.block(sender).await; - continue; } + } + }, + // Handle certificates from the network + message = certificate_receiver.recv() => { + // If the channel is closed, we should exit + let Ok((sender, message)) = message else { + break; + }; + + // If there is a decoding error, block + let Ok(message) = message else { + warn!(?sender, "blocking peer for decoding error"); + self.blocker.block(sender).await; + continue; + }; + + // Update metrics + let label = match &message { + Certificate::Notarization(_) => Inbound::notarization(&sender), + Certificate::Nullification(_) => Inbound::nullification(&sender), + Certificate::Finalization(_) => Inbound::finalization(&sender), + }; + self.inbound_messages.get_or_create(&label).inc(); + + // If the epoch is not the current epoch, block + if message.epoch() != self.epoch { + warn!(?sender, "blocking peer for epoch mismatch"); + self.blocker.block(sender).await; + continue; + } + + // Allow future certificates (they advance our view) + let view = message.view(); + if !interesting( + self.activity_timeout, + finalized, + current, + view, + true, // allow future + ) { + continue; + } + + match message { + Certificate::Notarization(notarization) => { + // Skip if we already have a notarization for this view + if work.get(&view).is_some_and(|r| r.has_notarization()) { + trace!(%view, "skipping duplicate notarization"); + continue; + } + + // Verify the certificate + if !notarization.verify( + &mut self.context, + &self.scheme, + &self.strategy, + ) { + warn!(?sender, %view, "blocking peer for invalid notarization"); + self.blocker.block(sender).await; + continue; + } - // Allow future certificates (they advance our view) - let view = message.view(); - if !interesting( - self.activity_timeout, - finalized, - current, - view, - true, // allow future - ) { - continue; + // Store and forward to voter + work + .entry(view) + .or_insert_with(|| self.new_round()) + .set_notarization(notarization.clone()); + voter + .recovered(Certificate::Notarization(notarization)) + .await; } + Certificate::Nullification(nullification) => { + // Skip if we already have a nullification for this view + if work.get(&view).is_some_and(|r| r.has_nullification()) { + trace!(%view, "skipping duplicate nullification"); + continue; + } - match message { - Certificate::Notarization(notarization) => { - // Skip if we already have a notarization for this view - if work.get(&view).is_some_and(|r| r.has_notarization()) { - trace!(%view, "skipping duplicate notarization"); - continue; - } - - // Verify the certificate - if !notarization.verify( - &mut self.context, - &self.scheme, - &self.strategy, - ) { - warn!(?sender, %view, "blocking peer for invalid notarization"); - self.blocker.block(sender).await; - continue; - } - - // Store and forward to voter - work - .entry(view) - .or_insert_with(|| self.new_round()) - .set_notarization(notarization.clone()); - voter - .recovered(Certificate::Notarization(notarization)) - .await; + // Verify the certificate + if !nullification.verify::<_, D>( + &mut self.context, + &self.scheme, + &self.strategy, + ) { + warn!(?sender, %view, "blocking peer for invalid nullification"); + self.blocker.block(sender).await; + continue; } - Certificate::Nullification(nullification) => { - // Skip if we already have a nullification for this view - if work.get(&view).is_some_and(|r| r.has_nullification()) { - trace!(%view, "skipping duplicate nullification"); - continue; - } - - // Verify the certificate - if !nullification.verify::<_, D>( - &mut self.context, - &self.scheme, - &self.strategy, - ) { - warn!(?sender, %view, "blocking peer for invalid nullification"); - self.blocker.block(sender).await; - continue; - } - - // Store and forward to voter - work - .entry(view) - .or_insert_with(|| self.new_round()) - .set_nullification(nullification.clone()); - voter - .recovered(Certificate::Nullification(nullification)) - .await; + + // Store and forward to voter + work + .entry(view) + .or_insert_with(|| self.new_round()) + .set_nullification(nullification.clone()); + voter + .recovered(Certificate::Nullification(nullification)) + .await; + } + Certificate::Finalization(finalization) => { + // Skip if we already have a finalization for this view + if work.get(&view).is_some_and(|r| r.has_finalization()) { + trace!(%view, "skipping duplicate finalization"); + continue; } - Certificate::Finalization(finalization) => { - // Skip if we already have a finalization for this view - if work.get(&view).is_some_and(|r| r.has_finalization()) { - trace!(%view, "skipping duplicate finalization"); - continue; - } - - // Verify the certificate - if !finalization.verify( - &mut self.context, - &self.scheme, - &self.strategy, - ) { - warn!(?sender, %view, "blocking peer for invalid finalization"); - self.blocker.block(sender).await; - continue; - } - - // Store and forward to voter - work - .entry(view) - .or_insert_with(|| self.new_round()) - .set_finalization(finalization.clone()); - voter - .recovered(Certificate::Finalization(finalization)) - .await; + + // Verify the certificate + if !finalization.verify( + &mut self.context, + &self.scheme, + &self.strategy, + ) { + warn!(?sender, %view, "blocking peer for invalid finalization"); + self.blocker.block(sender).await; + continue; } + + // Store and forward to voter + work + .entry(view) + .or_insert_with(|| self.new_round()) + .set_finalization(finalization.clone()); + voter + .recovered(Certificate::Finalization(finalization)) + .await; } + } - // Certificates are already forwarded to voter, no need for construction + // Certificates are already forwarded to voter, no need for construction + continue; + }, + // Handle votes from the network + message = vote_receiver.recv() => { + // If the channel is closed, we should exit + let Ok((sender, message)) = message else { + break; + }; + + // If there is a decoding error, block + let Ok(message) = message else { + warn!(?sender, "blocking peer for decoding error"); + self.blocker.block(sender).await; continue; - }, - // Handle votes from the network - message = vote_receiver.recv() => { - // If the channel is closed, we should exit - let Ok((sender, message)) = message else { - break; - }; - - // If there is a decoding error, block - let Ok(message) = message else { - warn!(?sender, "blocking peer for decoding error"); - self.blocker.block(sender).await; - continue; - }; - - // Update metrics - let label = match &message { - Vote::Notarize(_) => Inbound::notarize(&sender), - Vote::Nullify(_) => Inbound::nullify(&sender), - Vote::Finalize(_) => Inbound::finalize(&sender), - }; - self.inbound_messages.get_or_create(&label).inc(); - - // If the epoch is not the current epoch, block - if message.epoch() != self.epoch { - warn!(?sender, "blocking peer for epoch mismatch"); - self.blocker.block(sender).await; - continue; - } + }; + + // Update metrics + let label = match &message { + Vote::Notarize(_) => Inbound::notarize(&sender), + Vote::Nullify(_) => Inbound::nullify(&sender), + Vote::Finalize(_) => Inbound::finalize(&sender), + }; + self.inbound_messages.get_or_create(&label).inc(); + + // If the epoch is not the current epoch, block + if message.epoch() != self.epoch { + warn!(?sender, "blocking peer for epoch mismatch"); + self.blocker.block(sender).await; + continue; + } + + // If the view isn't interesting, we can skip + let view = message.view(); + if !interesting( + self.activity_timeout, + finalized, + current, + view, + false, + ) { + continue; + } - // If the view isn't interesting, we can skip - let view = message.view(); - if !interesting( - self.activity_timeout, - finalized, - current, - view, - false, - ) { - continue; + // Add the vote to the verifier + let peer = Peer::new(&sender); + if work + .entry(view) + .or_insert_with(|| self.new_round()) + .add_network(sender, message) + .await { + self.added.inc(); + + // Update per-peer latest vote metric (only if higher than current) + let _ = self + .latest_vote + .get_or_create(&peer) + .try_set_max(view.get()); } + updated_view = view; + }, + on_end => { + assert!( + updated_view != View::zero(), + "updated view must be greater than zero" + ); - // Add the vote to the verifier - let peer = Peer::new(&sender); - if work - .entry(view) - .or_insert_with(|| self.new_round()) - .add_network(sender, message) - .await { - self.added.inc(); - - // Update per-peer latest vote metric (only if higher than current) - let _ = self - .latest_vote - .get_or_create(&peer) - .try_set_max(view.get()); + // Forward leader's proposal to voter (if we're not the leader and haven't already) + if let Some(round) = work.get_mut(¤t) { + if let Some(me) = self.scheme.me() { + if let Some(proposal) = round.forward_proposal(me) { + voter.proposal(proposal).await; } - updated_view = view; - }, - } - assert!( - updated_view != View::zero(), - "updated view must be greater than zero" - ); - - // Forward leader's proposal to voter (if we're not the leader and haven't already) - if let Some(round) = work.get_mut(¤t) { - if let Some(me) = self.scheme.me() { - if let Some(proposal) = round.forward_proposal(me) { - voter.proposal(proposal).await; } } - } - - // Skip verification and construction for views at or below finalized. - // - // We still use interesting() for filtering votes because we want to - // notify the reporter of all votes within the activity timeout (even - // if we don't need them in the voter). - if updated_view <= finalized { - continue; - } - // Process the updated view (if any) - let Some(round) = work.get_mut(&updated_view) else { - continue; - }; - - // Batch verify votes if ready - let mut timer = self.verify_latency.timer(); - let verified = if round.ready_notarizes() { - Some(round.verify_notarizes(&mut self.context, &self.strategy)) - } else if round.ready_nullifies() { - Some(round.verify_nullifies(&mut self.context, &self.strategy)) - } else if round.ready_finalizes() { - Some(round.verify_finalizes(&mut self.context, &self.strategy)) - } else { - None - }; - - // Process batch verification results - if let Some((voters, failed)) = verified { - timer.observe(); - - // Process verified votes - let batch = voters.len() + failed.len(); - trace!(view = %updated_view, batch, "batch verified votes"); - self.verified.inc_by(batch as u64); - self.batch_size.observe(batch as f64); - - // Block invalid signers - for invalid in failed { - if let Some(signer) = self.participants.key(invalid) { - warn!(?signer, "blocking peer for invalid signature"); - self.blocker.block(signer.clone()).await; + // Skip verification and construction for views at or below finalized. + // + // We still use interesting() for filtering votes because we want to + // notify the reporter of all votes within the activity timeout (even + // if we don't need them in the voter). + if updated_view <= finalized { + continue; + } + + // Process the updated view (if any) + let Some(round) = work.get_mut(&updated_view) else { + continue; + }; + + // Batch verify votes if ready + let mut timer = self.verify_latency.timer(); + let verified = if round.ready_notarizes() { + Some(round.verify_notarizes(&mut self.context, &self.strategy)) + } else if round.ready_nullifies() { + Some(round.verify_nullifies(&mut self.context, &self.strategy)) + } else if round.ready_finalizes() { + Some(round.verify_finalizes(&mut self.context, &self.strategy)) + } else { + None + }; + + // Process batch verification results + if let Some((voters, failed)) = verified { + timer.observe(); + + // Process verified votes + let batch = voters.len() + failed.len(); + trace!(view = %updated_view, batch, "batch verified votes"); + self.verified.inc_by(batch as u64); + self.batch_size.observe(batch as f64); + + // Block invalid signers + for invalid in failed { + if let Some(signer) = self.participants.key(invalid) { + warn!(?signer, "blocking peer for invalid signature"); + self.blocker.block(signer.clone()).await; + } + } + + // Store verified votes for certificate construction + for valid in voters { + round.add_verified(valid); } + } else { + timer.cancel(); + trace!( + %current, + %finalized, + "no verifier ready" + ); } - // Store verified votes for certificate construction - for valid in voters { - round.add_verified(valid); + // Try to construct and forward certificates + if let Some(notarization) = self + .recover_latency + .time_some(|| round.try_construct_notarization(&self.scheme, &self.strategy)) + { + debug!(view = %updated_view, "constructed notarization, forwarding to voter"); + voter + .recovered(Certificate::Notarization(notarization)) + .await; } - } else { - timer.cancel(); - trace!( - %current, - %finalized, - "no verifier ready" - ); - } - - // Try to construct and forward certificates - if let Some(notarization) = self - .recover_latency - .time_some(|| round.try_construct_notarization(&self.scheme, &self.strategy)) - { - debug!(view = %updated_view, "constructed notarization, forwarding to voter"); - voter - .recovered(Certificate::Notarization(notarization)) - .await; - } - if let Some(nullification) = self - .recover_latency - .time_some(|| round.try_construct_nullification(&self.scheme, &self.strategy)) - { - debug!(view = %updated_view, "constructed nullification, forwarding to voter"); - voter - .recovered(Certificate::Nullification(nullification)) - .await; - } - if let Some(finalization) = self - .recover_latency - .time_some(|| round.try_construct_finalization(&self.scheme, &self.strategy)) - { - debug!(view = %updated_view, "constructed finalization, forwarding to voter"); - voter - .recovered(Certificate::Finalization(finalization)) - .await; - } - - // Drop any rounds that are no longer interesting - while work.first_key_value().is_some_and(|(&view, _)| { - !interesting(self.activity_timeout, finalized, current, view, false) - }) { - work.pop_first(); - } + if let Some(nullification) = self + .recover_latency + .time_some(|| round.try_construct_nullification(&self.scheme, &self.strategy)) + { + debug!(view = %updated_view, "constructed nullification, forwarding to voter"); + voter + .recovered(Certificate::Nullification(nullification)) + .await; + } + if let Some(finalization) = self + .recover_latency + .time_some(|| round.try_construct_finalization(&self.scheme, &self.strategy)) + { + debug!(view = %updated_view, "constructed finalization, forwarding to voter"); + voter + .recovered(Certificate::Finalization(finalization)) + .await; + } + + // Drop any rounds that are no longer interesting + while work.first_key_value().is_some_and(|(&view, _)| { + !interesting(self.activity_timeout, finalized, current, view, false) + }) { + work.pop_first(); + } + }, } } } diff --git a/consensus/src/simplex/actors/voter/actor.rs b/consensus/src/simplex/actors/voter/actor.rs index d8b2cc559f..be20183adb 100644 --- a/consensus/src/simplex/actors/voter/actor.rs +++ b/consensus/src/simplex/actors/voter/actor.rs @@ -19,7 +19,7 @@ use crate::{ }; use commonware_codec::Read; use commonware_cryptography::Digest; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{utils::codec::WrappedSender, Blocker, Recipients, Sender}; use commonware_runtime::{ buffer::PoolRef, spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage, @@ -786,261 +786,256 @@ impl< .update(observed_view, leader, self.state.last_finalized()) .await; - // Create shutdown tracker - let mut shutdown = self.context.stopped(); - // Process messages let mut pending_propose: Option, D>> = None; let mut pending_verify: Option, bool>> = None; let mut certify_pool: AbortablePool<(Rnd, Result)> = Default::default(); - loop { - // Drop any pending items if we have moved to a new view - if let Some(ref pp) = pending_propose { - if pp.view() != self.state.current_view() { - pending_propose = None; + select_loop! { + self.context, + on_start => { + // Drop any pending items if we have moved to a new view + if let Some(ref pp) = pending_propose { + if pp.view() != self.state.current_view() { + pending_propose = None; + } } - } - if let Some(ref pv) = pending_verify { - if pv.view() != self.state.current_view() { - pending_verify = None; + if let Some(ref pv) = pending_verify { + if pv.view() != self.state.current_view() { + pending_verify = None; + } } - } - // If needed, propose a container - if pending_propose.is_none() { - pending_propose = self.try_propose().await; - } + // If needed, propose a container + if pending_propose.is_none() { + pending_propose = self.try_propose().await; + } - // If needed, verify current view - if pending_verify.is_none() { - pending_verify = self.try_verify().await; - } + // If needed, verify current view + if pending_verify.is_none() { + pending_verify = self.try_verify().await; + } - // Attempt to certify any views that we have notarizations for. - for proposal in self.state.certify_candidates() { - let round = proposal.round; - let view = round.view(); - debug!(%view, "attempting certification"); - let receiver = self.automaton.certify(round, proposal.payload).await; - let handle = certify_pool.push(async move { (round, receiver.await) }); - self.state.set_certify_handle(view, handle); - } + // Attempt to certify any views that we have notarizations for. + for proposal in self.state.certify_candidates() { + let round = proposal.round; + let view = round.view(); + debug!(%view, "attempting certification"); + let receiver = self.automaton.certify(round, proposal.payload).await; + let handle = certify_pool.push(async move { (round, receiver.await) }); + self.state.set_certify_handle(view, handle); + } - // Prepare waiters - let propose_wait = Waiter(&mut pending_propose); - let verify_wait = Waiter(&mut pending_verify); - let certify_wait = certify_pool.next_completed(); - - // Wait for a timeout to fire or for a message to arrive - let timeout = self.state.next_timeout_deadline(); - let start = self.state.current_view(); - let mut resolved = Resolved::None; - let view; - select! { - _ = &mut shutdown => { - debug!("context shutdown, stopping voter"); - - // Sync and drop journal - self.journal.take().unwrap().sync_all().await.expect("unable to sync journal"); - - // Only drop shutdown once journal is synced - drop(shutdown); - return; - }, - _ = self.context.sleep_until(timeout) => { - // Trigger the timeout - self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender).await; - view = self.state.current_view(); - }, - (context, proposed) = propose_wait => { - // Clear propose waiter - pending_propose = None; - - // Try to use result - let proposed = match proposed { - Ok(proposed) => proposed, - Err(err) => { - debug!(?err, round = ?context.round, "failed to propose container"); - continue; - } - }; + // Prepare waiters + let propose_wait = Waiter(&mut pending_propose); + let verify_wait = Waiter(&mut pending_verify); + let certify_wait = certify_pool.next_completed(); + + // Wait for a timeout to fire or for a message to arrive + let timeout = self.state.next_timeout_deadline(); + let start = self.state.current_view(); + let mut resolved = Resolved::None; + let view; + }, + on_stopped => { + debug!("context shutdown, stopping voter"); - // If we have already moved to another view, drop the response as we will - // not broadcast it - let our_round = Rnd::new(self.state.epoch(), self.state.current_view()); - if our_round != context.round { - debug!(round = ?context.round, ?our_round, "dropping requested proposal"); + // Sync and drop journal + self.journal.take().unwrap().sync_all().await.expect("unable to sync journal"); + }, + _ = self.context.sleep_until(timeout) => { + // Trigger the timeout + self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender).await; + view = self.state.current_view(); + }, + (context, proposed) = propose_wait => { + // Clear propose waiter + pending_propose = None; + + // Try to use result + let proposed = match proposed { + Ok(proposed) => proposed, + Err(err) => { + debug!(?err, round = ?context.round, "failed to propose container"); continue; } + }; + + // If we have already moved to another view, drop the response as we will + // not broadcast it + let our_round = Rnd::new(self.state.epoch(), self.state.current_view()); + if our_round != context.round { + debug!(round = ?context.round, ?our_round, "dropping requested proposal"); + continue; + } - // Construct proposal - let proposal = Proposal::new( - context.round, - context.parent.0, - proposed, - ); - if !self.state.proposed(proposal) { - warn!(round = ?context.round, "dropped our proposal"); - continue; + // Construct proposal + let proposal = Proposal::new( + context.round, + context.parent.0, + proposed, + ); + if !self.state.proposed(proposal) { + warn!(round = ?context.round, "dropped our proposal"); + continue; + } + view = self.state.current_view(); + + // Notify application of proposal + self.relay.broadcast(proposed).await; + }, + (context, verified) = verify_wait => { + // Clear verify waiter + pending_verify = None; + + // Try to use result + view = context.view(); + match verified { + Ok(true) => { + // Mark verification complete + self.state.verified(view); + }, + Ok(false) => { + // Verification failed for current view proposal, treat as immediate timeout + debug!(round = ?context.round, "proposal failed verification"); + self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender) + .await; + }, + Err(err) => { + debug!(?err, round = ?context.round, "failed to verify proposal"); } - view = self.state.current_view(); - - // Notify application of proposal - self.relay.broadcast(proposed).await; - }, - (context, verified) = verify_wait => { - // Clear verify waiter - pending_verify = None; - - // Try to use result - view = context.view(); - match verified { - Ok(true) => { - // Mark verification complete - self.state.verified(view); - }, - Ok(false) => { - // Verification failed for current view proposal, treat as immediate timeout - debug!(round = ?context.round, "proposal failed verification"); - self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender) + }; + }, + result = certify_wait => { + // Aborted futures are expected when old views are pruned. + let Ok((round, certified)) = result else { + continue; + }; + + // Handle response to our certification request. + view = round.view(); + match certified { + Ok(certified) => { + let Some(notarization) = self.handle_certification(view, certified).await + else { + continue; + }; + resolver.certified(view, certified).await; + if certified { + self.reporter + .report(Activity::Certification(notarization)) .await; - }, - Err(err) => { - debug!(?err, round = ?context.round, "failed to verify proposal"); } - }; - }, - result = certify_wait => { - // Aborted futures are expected when old views are pruned. - let Ok((round, certified)) = result else { - continue; - }; - - // Handle response to our certification request. - view = round.view(); - match certified { - Ok(certified) => { - let Some(notarization) = self.handle_certification(view, certified).await - else { - continue; - }; - resolver.certified(view, certified).await; - if certified { - self.reporter - .report(Activity::Certification(notarization)) - .await; - } + } + Err(err) => { + // Unlike propose/verify (where failing to act will lead to a timeout + // and subsequent nullification), failing to certify can lead to a halt + // because we'll never exit the view without a notarization + certification. + // + // We do not assume failure here because certification results are persisted + // to the journal and will be recovered on restart. + debug!(?err, ?round, "failed to certify proposal"); + } + }; + }, + mailbox = self.mailbox_receiver.next() => { + // Extract message + let Some(msg) = mailbox else { + break; + }; + + // Handle messages from resolver and batcher + match msg { + Message::Proposal(proposal) => { + view = proposal.view(); + if !self.state.is_interesting(view, false) { + trace!(%view, "proposal is not interesting"); + continue; } - Err(err) => { - // Unlike propose/verify (where failing to act will lead to a timeout - // and subsequent nullification), failing to certify can lead to a halt - // because we'll never exit the view without a notarization + certification. - // - // We do not assume failure here because certification results are persisted - // to the journal and will be recovered on restart. - debug!(?err, ?round, "failed to certify proposal"); + trace!(%view, "received proposal"); + if !self.state.set_proposal(view, proposal) { + continue; } - }; - }, - mailbox = self.mailbox_receiver.next() => { - // Extract message - let Some(msg) = mailbox else { - break; - }; - - // Handle messages from resolver and batcher - match msg { - Message::Proposal(proposal) => { - view = proposal.view(); - if !self.state.is_interesting(view, false) { - trace!(%view, "proposal is not interesting"); - continue; - } - trace!(%view, "received proposal"); - if !self.state.set_proposal(view, proposal) { - continue; - } + } + Message::Verified(certificate, from_resolver) => { + // Certificates can come from future views (they advance our view) + view = certificate.view(); + if !self.state.is_interesting(view, true) { + trace!(%view, "certificate is not interesting"); + continue; } - Message::Verified(certificate, from_resolver) => { - // Certificates can come from future views (they advance our view) - view = certificate.view(); - if !self.state.is_interesting(view, true) { - trace!(%view, "certificate is not interesting"); - continue; - } - // Track resolved status to avoid sending back to resolver - match certificate { - Certificate::Notarization(notarization) => { - trace!(%view, from_resolver, "received notarization"); - self.handle_notarization(notarization).await; - if from_resolver { - resolved = Resolved::Notarization; - } + // Track resolved status to avoid sending back to resolver + match certificate { + Certificate::Notarization(notarization) => { + trace!(%view, from_resolver, "received notarization"); + self.handle_notarization(notarization).await; + if from_resolver { + resolved = Resolved::Notarization; } - Certificate::Nullification(nullification) => { - trace!(%view, from_resolver, "received nullification"); - if let Some(floor) = self.handle_nullification(nullification).await { - warn!(?floor, "broadcasting nullification floor"); - self.broadcast_certificate(&mut certificate_sender, floor) - .await; - } - if from_resolver { - resolved = Resolved::Nullification; - } + } + Certificate::Nullification(nullification) => { + trace!(%view, from_resolver, "received nullification"); + if let Some(floor) = self.handle_nullification(nullification).await { + warn!(?floor, "broadcasting nullification floor"); + self.broadcast_certificate(&mut certificate_sender, floor) + .await; } - Certificate::Finalization(finalization) => { - trace!(%view, from_resolver, "received finalization"); - self.handle_finalization(finalization).await; - if from_resolver { - resolved = Resolved::Finalization; - } + if from_resolver { + resolved = Resolved::Nullification; + } + } + Certificate::Finalization(finalization) => { + trace!(%view, from_resolver, "received finalization"); + self.handle_finalization(finalization).await; + if from_resolver { + resolved = Resolved::Finalization; } } } } - }, - }; - - // Attempt to send any new view messages - // - // The batcher may drop votes we construct here if it has not yet been updated to the - // message's view. This only happens when we skip ahead multiple views, which always - // coincides with entering a new view (triggering a batcher update below before we send - // any votes for the new current view). This has no impact on liveness, however, we may miss - // building a finalization for an old view where we otherwise could have contributed. - self.notify( - &mut batcher, - &mut resolver, - &mut vote_sender, - &mut certificate_sender, - view, - resolved, - ) - .await; + } + }, + on_end => { + // Attempt to send any new view messages + // + // The batcher may drop votes we construct here if it has not yet been updated to the + // message's view. This only happens when we skip ahead multiple views, which always + // coincides with entering a new view (triggering a batcher update below before we send + // any votes for the new current view). This has no impact on liveness, however, we may miss + // building a finalization for an old view where we otherwise could have contributed. + self.notify( + &mut batcher, + &mut resolver, + &mut vote_sender, + &mut certificate_sender, + view, + resolved, + ) + .await; - // After sending all required messages, prune any views - // we no longer need - self.prune_views().await; - - // Update the batcher if we have moved to a new view - let current_view = self.state.current_view(); - if current_view > start { - let leader = self - .state - .leader_index(current_view) - .expect("leader not set"); - - // If the leader is not active (and not us), we should reduce leader timeout to now - let is_active = batcher - .update(current_view, leader, self.state.last_finalized()) - .await; - if !is_active && !self.state.is_me(leader) { - debug!(%view, %leader, "skipping leader timeout due to inactivity"); - self.state.expire_round(current_view); + // After sending all required messages, prune any views + // we no longer need + self.prune_views().await; + + // Update the batcher if we have moved to a new view + let current_view = self.state.current_view(); + if current_view > start { + let leader = self + .state + .leader_index(current_view) + .expect("leader not set"); + + // If the leader is not active (and not us), we should reduce leader timeout to now + let is_active = batcher + .update(current_view, leader, self.state.last_finalized()) + .await; + if !is_active && !self.state.is_me(leader) { + debug!(%view, %leader, "skipping leader timeout due to inactivity"); + self.state.expire_round(current_view); + } } - } + }, } } } diff --git a/macros/src/lib.rs b/macros/src/lib.rs index 049ea013e7..3873ca6952 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -12,7 +12,7 @@ use proc_macro_crate::{crate_name, FoundCrate}; use quote::{format_ident, quote, ToTokens}; use syn::{ parse::{Parse, ParseStream, Result}, - parse_macro_input, Block, Error, Expr, Ident, ItemFn, LitStr, Pat, Token, + parse_macro_input, Error, Expr, Ident, ItemFn, LitStr, Pat, Token, }; mod nextest; @@ -304,7 +304,7 @@ struct SelectInput { struct Branch { pattern: Pat, future: Expr, - block: Block, + body: Expr, } impl Parse for SelectInput { @@ -316,12 +316,12 @@ impl Parse for SelectInput { input.parse::()?; let future: Expr = input.parse()?; input.parse::]>()?; - let block: Block = input.parse()?; + let body: Expr = input.parse()?; branches.push(Branch { pattern, future, - block, + body, }); if input.peek(Token![,]) { @@ -340,10 +340,10 @@ impl ToTokens for SelectInput { for branch in &self.branches { let pattern = &branch.pattern; let future = &branch.future; - let block = &branch.block; + let body = &branch.body; tokens.extend(quote! { - #pattern = #future => #block, + #pattern = #future => #body, }); } } @@ -392,12 +392,12 @@ pub fn select(input: TokenStream) -> TokenStream { for Branch { pattern, future, - block, + body, } in branches.into_iter() { // Generate branch for `select_biased!` macro let branch_code = quote! { - #pattern = (#future).fuse() => #block, + #pattern = (#future).fuse() => #body, }; select_branches.push(branch_code); } @@ -417,11 +417,13 @@ pub fn select(input: TokenStream) -> TokenStream { /// Input for [select_loop!]. /// -/// Parses: `context, on_stopped => { block }, { branches... }` +/// Parses: `context, [on_start => expr,] on_stopped => expr, branches... [, on_end => expr]` struct SelectLoopInput { context: Expr, - shutdown_block: Block, + start_expr: Option, + shutdown_expr: Expr, branches: Vec, + end_expr: Option, } impl Parse for SelectLoopInput { @@ -430,6 +432,22 @@ impl Parse for SelectLoopInput { let context: Expr = input.parse()?; input.parse::()?; + // Check for optional `on_start =>` + let start_expr = if input.peek(Ident) { + let ident: Ident = input.fork().parse()?; + if ident == "on_start" { + input.parse::()?; // consume the ident + input.parse::]>()?; + let expr: Expr = input.parse()?; + input.parse::()?; + Some(expr) + } else { + None + } + } else { + None + }; + // Parse `on_stopped =>` let on_stopped_ident: Ident = input.parse()?; if on_stopped_ident != "on_stopped" { @@ -440,25 +458,34 @@ impl Parse for SelectLoopInput { } input.parse::]>()?; - // Parse shutdown block - let shutdown_block: Block = input.parse()?; + // Parse shutdown expression + let shutdown_expr: Expr = input.parse()?; - // Parse comma after shutdown block + // Parse comma after shutdown expression input.parse::()?; // Parse branches directly (no surrounding braces) + // Stop when we see `on_end` or reach end of input let mut branches = Vec::new(); while !input.is_empty() { + // Check if next token is `on_end` + if input.peek(Ident) { + let ident: Ident = input.fork().parse()?; + if ident == "on_end" { + break; + } + } + let pattern = Pat::parse_single(input)?; input.parse::()?; let future: Expr = input.parse()?; input.parse::]>()?; - let block: Block = input.parse()?; + let body: Expr = input.parse()?; branches.push(Branch { pattern, future, - block, + body, }); if input.peek(Token![,]) { @@ -468,10 +495,29 @@ impl Parse for SelectLoopInput { } } + // Check for optional `on_end =>` + let end_expr = if !input.is_empty() && input.peek(Ident) { + let ident: Ident = input.parse()?; + if ident == "on_end" { + input.parse::]>()?; + let expr: Expr = input.parse()?; + if input.peek(Token![,]) { + input.parse::()?; + } + Some(expr) + } else { + return Err(Error::new(ident.span(), "expected `on_end` keyword")); + } + } else { + None + }; + Ok(Self { context, - shutdown_block, + start_expr, + shutdown_expr, branches, + end_expr, }) } } @@ -492,12 +538,26 @@ impl Parse for SelectLoopInput { /// ```rust,ignore /// select_loop! { /// context, +/// on_start => { /* optional: runs at start of each iteration */ }, /// on_stopped => { cleanup }, /// pattern = future => block, /// // ... +/// on_end => { /* optional: runs after non-shutdown arm completes */ }, /// } /// ``` /// +/// The order of blocks matches execution order: +/// 1. `on_start` (optional) - Runs at the start of each loop iteration, before the select. +/// Can use `continue` to skip the select or `break` to exit the loop. +/// 2. `on_stopped` (required) - The shutdown handler, executed when shutdown is signaled. +/// 3. Select arms - The futures to select over. +/// 4. `on_end` (optional) - Runs after a non-shutdown arm completes. Skipped when shutdown +/// is triggered. Useful for post-processing that should happen after each arm. +/// +/// All blocks share the same lexical scope within the loop body. Variables declared in +/// `on_start` are visible in the select arms, `on_stopped`, and `on_end`. This allows +/// preparing state in `on_start` and using it throughout the iteration. +/// /// The `shutdown` variable (the future from `context.stopped()`) is accessible in the /// shutdown block, allowing explicit cleanup such as `drop(shutdown)` before breaking or returning. /// @@ -507,15 +567,25 @@ impl Parse for SelectLoopInput { /// use commonware_macros::select_loop; /// /// async fn run(context: impl commonware_runtime::Spawner) { +/// let mut counter = 0; /// select_loop! { /// context, +/// on_start => { +/// // Prepare state for this iteration (visible in arms and on_end) +/// let start_time = std::time::Instant::now(); +/// counter += 1; +/// }, /// on_stopped => { -/// println!("shutting down"); +/// println!("shutting down after {} iterations", counter); /// drop(shutdown); /// }, /// msg = receiver.recv() => { /// println!("received: {:?}", msg); /// }, +/// on_end => { +/// // Access variables from on_start +/// println!("iteration took {:?}", start_time.elapsed()); +/// }, /// } /// } /// ``` @@ -523,8 +593,10 @@ impl Parse for SelectLoopInput { pub fn select_loop(input: TokenStream) -> TokenStream { let SelectLoopInput { context, - shutdown_block, + start_expr, + shutdown_expr, branches, + end_expr, } = parse_macro_input!(input as SelectLoopInput); // Convert branches to tokens for the inner select! @@ -533,18 +605,37 @@ pub fn select_loop(input: TokenStream) -> TokenStream { .map(|b| { let pattern = &b.pattern; let future = &b.future; - let block = &b.block; - quote! { #pattern = #future => #block, } + let body = &b.body; + quote! { #pattern = #future => #body, } }) .collect(); + // Helper to convert an expression to tokens, inlining block contents + // to preserve variable scope + fn expr_to_tokens(expr: &Expr) -> proc_macro2::TokenStream { + match expr { + Expr::Block(block) => { + let stmts = &block.block.stmts; + quote! { #(#stmts)* } + } + other => quote! { #other; }, + } + } + + // Generate on_start and on_end tokens if present + let on_start_tokens = start_expr.as_ref().map(expr_to_tokens); + let on_end_tokens = end_expr.as_ref().map(expr_to_tokens); + let shutdown_tokens = expr_to_tokens(&shutdown_expr); + quote! { { let mut shutdown = #context.stopped(); loop { + #on_start_tokens + commonware_macros::select! { _ = &mut shutdown => { - #shutdown_block + #shutdown_tokens // Break the loop after handling shutdown. Some implementations // may divert control flow themselves, so this may be unused. @@ -553,6 +644,8 @@ pub fn select_loop(input: TokenStream) -> TokenStream { }, #(#branch_tokens)* } + + #on_end_tokens } } } diff --git a/macros/tests/select.rs b/macros/tests/select.rs index d731ce3cd3..4ca9695838 100644 --- a/macros/tests/select.rs +++ b/macros/tests/select.rs @@ -178,4 +178,235 @@ mod tests { assert_eq!(results, vec![100, 200, 1, 2]); }); } + + #[test] + fn test_select_loop_lifecycle_hooks() { + block_on(async move { + let (mut tx, mut rx) = mpsc::unbounded(); + tx.send(10).await.unwrap(); + tx.send(20).await.unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mut start_count = 0; + let mut end_count = 0; + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_start => { + start_count += 1; + }, + on_stopped => {}, + msg = rx.next() => { + match msg { + Some(v) => received.push(v), + None => break, + } + }, + on_end => { + end_count += 1; + }, + } + assert_eq!(received, vec![10, 20]); + // on_start runs before each iteration: 2 messages + 1 for the None + assert_eq!(start_count, 3); + // on_end runs after each arm but not after break: 2 messages + assert_eq!(end_count, 2); + }); + } + + #[test] + fn test_select_loop_on_start_continue() { + block_on(async move { + let (mut tx, mut rx) = mpsc::unbounded(); + tx.send(1).await.unwrap(); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mut start_count = 0; + let mut end_count = 0; + let mut skip_count = 0; + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_start => { + start_count += 1; + // Skip the first iteration (don't run select) + if start_count == 1 { + skip_count += 1; + continue; + } + }, + on_stopped => {}, + msg = rx.next() => { + match msg { + Some(v) => received.push(v), + None => break, + } + }, + on_end => { + end_count += 1; + }, + } + // All messages received (continue just skips one iteration, not a message) + assert_eq!(received, vec![1, 2, 3]); + // on_start runs 5 times: 1 (skip), 2 (recv 1), 3 (recv 2), 4 (recv 3), 5 (None) + assert_eq!(start_count, 5); + // on_end runs 3 times (once per message, not on skip or break) + assert_eq!(end_count, 3); + // We skipped once + assert_eq!(skip_count, 1); + }); + } + + #[test] + fn test_select_loop_on_end_not_called_on_shutdown() { + block_on(async move { + let (mut tx, mut rx) = mpsc::unbounded(); + tx.send(1).await.unwrap(); + drop(tx); + + let mut end_count = 0; + #[allow(unused)] + let mut did_shutdown = false; + + let mock_context = MockSignalerResolves; + select_loop! { + mock_context, + on_stopped => { + did_shutdown = true; + }, + _ = rx.next() => { + // sink msg + }, + on_end => { + end_count += 1; + }, + } + + assert!(did_shutdown); + // on_end should NOT be called when shutdown triggers + assert_eq!(end_count, 0); + }); + } + + #[test] + fn test_select_loop_on_start_variable_visibility() { + block_on(async move { + let (mut tx, mut rx) = mpsc::unbounded(); + tx.send(5).await.unwrap(); + tx.send(3).await.unwrap(); + drop(tx); + + let mut results = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_start => { + // Declare a variable in on_start + let multiplier = 10; + }, + on_stopped => {}, + msg = rx.next() => { + match msg { + // Use the variable from on_start in the select arm + Some(v) => results.push(v * multiplier), + None => break, + } + }, + on_end => { + // Use the variable from on_start in on_end + results.push(multiplier); + }, + } + // First iteration: receive 5, push 5*10=50, then on_end pushes 10 + // Second iteration: receive 3, push 3*10=30, then on_end pushes 10 + // Third iteration: receive None, break (on_end not called) + assert_eq!(results, vec![50, 10, 30, 10]); + }); + } + + #[test] + fn test_select_braceless_expressions() { + block_on(async move { + let mut results = Vec::new(); + + // Test braceless assignment expression + let (mut tx1, mut rx1) = mpsc::unbounded::(); + tx1.send(42).await.unwrap(); + drop(tx1); + + #[allow(unused_assignments)] + let mut result = 0; + select! { + msg = rx1.next() => result = msg.unwrap_or(0), + } + assert_eq!(result, 42); + + // Test braceless method call + let (mut tx2, mut rx2) = mpsc::unbounded(); + tx2.send(100).await.unwrap(); + drop(tx2); + + select! { + msg = rx2.next() => results.push(msg.unwrap()), + } + + // Test braced syntax still works + let (mut tx3, mut rx3) = mpsc::unbounded(); + tx3.send(1).await.unwrap(); + drop(tx3); + + select! { + msg = rx3.next() => if let Some(v) = msg { + results.push(v); + }, + } + + assert_eq!(results, vec![100, 1]); + }); + } + + #[test] + fn test_select_loop_braceless_syntax() { + block_on(async move { + // Test all braceless: on_start, on_stopped, branch, on_end + let (mut tx, mut rx) = mpsc::unbounded(); + tx.send(10).await.unwrap(); + tx.send(20).await.unwrap(); + drop(tx); + + let mut start_count = 0; + let mut end_count = 0; + let mut received = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_start => start_count += 1, + on_stopped => {}, + msg = rx.next() => match msg { + Some(v) => received.push(v), + None => break, + }, + on_end => end_count += 1, + } + assert_eq!(received, vec![10, 20]); + assert_eq!(start_count, 3); // 2 messages + None + assert_eq!(end_count, 2); // 2 messages, not break + + // Test braceless on_stopped with immediate shutdown + let (_tx2, mut rx2) = mpsc::unbounded::(); + #[allow(unused_assignments)] + let mut did_shutdown = false; + let mock_context2 = MockSignalerResolves; + select_loop! { + mock_context2, + on_stopped => did_shutdown = true, + _ = rx2.next() => {}, + } + assert!(did_shutdown); + }); + } } diff --git a/resolver/src/p2p/engine.rs b/resolver/src/p2p/engine.rs index 6a26e5cfda..640f8f80ed 100644 --- a/resolver/src/p2p/engine.rs +++ b/resolver/src/p2p/engine.rs @@ -7,7 +7,7 @@ use super::{ use crate::Consumer; use bytes::Bytes; use commonware_cryptography::PublicKey; -use commonware_macros::select; +use commonware_macros::select_loop; use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Blocker, Manager, Receiver, Recipients, Sender, @@ -154,211 +154,202 @@ impl< /// Inner run loop called by `start`. async fn run(mut self, network: (NetS, NetR)) { - let mut shutdown = self.context.stopped(); let peer_set_subscription = &mut self.manager.subscribe().await; // Wrap channel let (mut sender, mut receiver) = wrap((), network.0, network.1); - loop { - // Update metrics - let _ = self - .metrics - .fetch_pending - .try_set(self.fetcher.len_pending()); - let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active()); - let _ = self - .metrics - .peers_blocked - .try_set(self.fetcher.len_blocked()); - let _ = self.metrics.serve_processing.try_set(self.serves.len()); - - // Get retry timeout (if any) - let deadline_pending = match self.fetcher.get_pending_deadline() { - Some(deadline) => Either::Left(self.context.sleep_until(deadline)), - None => Either::Right(future::pending()), - }; - - // Get requester timeout (if any) - let deadline_active = match self.fetcher.get_active_deadline() { - Some(deadline) => Either::Left(self.context.sleep_until(deadline)), - None => Either::Right(future::pending()), - }; - - // Handle shutdown signal - select! { - _ = &mut shutdown => { - debug!("shutdown"); - self.serves.cancel_all(); + select_loop! { + self.context, + on_start => { + // Update metrics + let _ = self + .metrics + .fetch_pending + .try_set(self.fetcher.len_pending()); + let _ = self.metrics.fetch_active.try_set(self.fetcher.len_active()); + let _ = self + .metrics + .peers_blocked + .try_set(self.fetcher.len_blocked()); + let _ = self.metrics.serve_processing.try_set(self.serves.len()); + + // Get retry timeout (if any) + let deadline_pending = match self.fetcher.get_pending_deadline() { + Some(deadline) => Either::Left(self.context.sleep_until(deadline)), + None => Either::Right(future::pending()), + }; + + // Get requester timeout (if any) + let deadline_active = match self.fetcher.get_active_deadline() { + Some(deadline) => Either::Left(self.context.sleep_until(deadline)), + None => Either::Right(future::pending()), + }; + }, + on_stopped => { + debug!("shutdown"); + self.serves.cancel_all(); + }, + // Handle peer set updates + peer_set_update = peer_set_subscription.next() => { + let Some((id, _, all)) = peer_set_update else { + debug!("peer set subscription closed"); return; - }, - - // Handle peer set updates - peer_set_update = peer_set_subscription.next() => { - let Some((id, _, all)) = peer_set_update else { - debug!("peer set subscription closed"); - return; - }; - - // Instead of directing our requests to exclusively the latest set (which may still be syncing, we - // reconcile with all tracked peers). - if self.last_peer_set_id < Some(id) { - self.last_peer_set_id = Some(id); - self.fetcher.reconcile(all.as_ref()); - } - }, - - // Handle active deadline - _ = deadline_active => { - if let Some(key) = self.fetcher.pop_active() { - debug!(?key, "requester timeout"); - self.metrics.fetch.inc(Status::Failure); - self.fetcher.add_retry(key); - } - }, - - // Handle pending deadline - _ = deadline_pending => { - self.fetcher.fetch(&mut sender).await; - }, - - // Handle mailbox messages - msg = self.mailbox.next() => { - let Some(msg) = msg else { - error!("mailbox closed"); - return; - }; - match msg { - Message::Fetch(requests) => { - for FetchRequest { key, targets } in requests { - trace!(?key, "mailbox: fetch"); - - // Check if the fetch is already in progress - let is_new = !self.fetch_timers.contains_key(&key); - - // Update targets - match targets { - Some(targets) => { - // Only add targets if this is a new fetch OR the existing - // fetch already has targets. Don't restrict an "all" fetch - // (no targets) to specific targets. - if is_new || self.fetcher.has_targets(&key) { - self.fetcher.add_targets(key.clone(), targets); - } + }; + + // Instead of directing our requests to exclusively the latest set (which may still be syncing, we + // reconcile with all tracked peers). + if self.last_peer_set_id < Some(id) { + self.last_peer_set_id = Some(id); + self.fetcher.reconcile(all.as_ref()); + } + }, + // Handle active deadline + _ = deadline_active => { + if let Some(key) = self.fetcher.pop_active() { + debug!(?key, "requester timeout"); + self.metrics.fetch.inc(Status::Failure); + self.fetcher.add_retry(key); + } + }, + // Handle pending deadline + _ = deadline_pending => { + self.fetcher.fetch(&mut sender).await; + }, + // Handle mailbox messages + msg = self.mailbox.next() => { + let Some(msg) = msg else { + error!("mailbox closed"); + return; + }; + match msg { + Message::Fetch(requests) => { + for FetchRequest { key, targets } in requests { + trace!(?key, "mailbox: fetch"); + + // Check if the fetch is already in progress + let is_new = !self.fetch_timers.contains_key(&key); + + // Update targets + match targets { + Some(targets) => { + // Only add targets if this is a new fetch OR the existing + // fetch already has targets. Don't restrict an "all" fetch + // (no targets) to specific targets. + if is_new || self.fetcher.has_targets(&key) { + self.fetcher.add_targets(key.clone(), targets); } - None => self.fetcher.clear_targets(&key), - } - - // Only start new fetch if not already in progress - if is_new { - self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer()); - self.fetcher.add_ready(key); - } else { - trace!(?key, "updated targets for existing fetch"); } - } - } - Message::Cancel { key } => { - trace!(?key, "mailbox: cancel"); - let mut guard = self.metrics.cancel.guard(Status::Dropped); - if self.fetcher.cancel(&key) { - guard.set(Status::Success); - self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric - self.consumer.failed(key.clone(), ()).await; - } - } - Message::Retain { predicate } => { - trace!("mailbox: retain"); - - // Remove from fetcher - self.fetcher.retain(&predicate); - - // Clean up timers and notify consumer - let before = self.fetch_timers.len(); - let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::>(); - for (key, timer) in removed { - timer.cancel(); - self.consumer.failed(key, ()).await; + None => self.fetcher.clear_targets(&key), } - // Metrics - let removed = (before - self.fetch_timers.len()) as u64; - if removed == 0 { - self.metrics.cancel.inc(Status::Dropped); + // Only start new fetch if not already in progress + if is_new { + self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer()); + self.fetcher.add_ready(key); } else { - self.metrics.cancel.inc_by(Status::Success, removed); + trace!(?key, "updated targets for existing fetch"); } } - Message::Clear => { - trace!("mailbox: clear"); + } + Message::Cancel { key } => { + trace!(?key, "mailbox: cancel"); + let mut guard = self.metrics.cancel.guard(Status::Dropped); + if self.fetcher.cancel(&key) { + guard.set(Status::Success); + self.fetch_timers.remove(&key).unwrap().cancel(); // must exist, don't record metric + self.consumer.failed(key.clone(), ()).await; + } + } + Message::Retain { predicate } => { + trace!("mailbox: retain"); - // Clear fetcher - self.fetcher.clear(); + // Remove from fetcher + self.fetcher.retain(&predicate); - // Drain timers and notify consumer - let removed = self.fetch_timers.len() as u64; - for (key, timer) in self.fetch_timers.drain() { - timer.cancel(); - self.consumer.failed(key, ()).await; - } + // Clean up timers and notify consumer + let before = self.fetch_timers.len(); + let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::>(); + for (key, timer) in removed { + timer.cancel(); + self.consumer.failed(key, ()).await; + } - // Metrics - if removed == 0 { - self.metrics.cancel.inc(Status::Dropped); - } else { - self.metrics.cancel.inc_by(Status::Success, removed); - } + // Metrics + let removed = (before - self.fetch_timers.len()) as u64; + if removed == 0 { + self.metrics.cancel.inc(Status::Dropped); + } else { + self.metrics.cancel.inc_by(Status::Success, removed); } } - assert_eq!(self.fetcher.len(), self.fetch_timers.len()); - }, + Message::Clear => { + trace!("mailbox: clear"); - // Handle completed server requests - serve = self.serves.next_completed() => { - let Serve { timer, peer, id, result } = serve; + // Clear fetcher + self.fetcher.clear(); - // Metrics and logs - match result { - Ok(_) => { - self.metrics.serve.inc(Status::Success); - } - Err(err) => { - debug!(?err, ?peer, ?id, "serve failed"); + // Drain timers and notify consumer + let removed = self.fetch_timers.len() as u64; + for (key, timer) in self.fetch_timers.drain() { timer.cancel(); - self.metrics.serve.inc(Status::Failure); + self.consumer.failed(key, ()).await; } - } - // Send response to peer - self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await; - }, - - // Handle network messages - msg = receiver.recv() => { - // Break if the receiver is closed - let (peer, msg) = match msg { - Ok(msg) => msg, - Err(err) => { - error!(?err, "receiver closed"); - return; + // Metrics + if removed == 0 { + self.metrics.cancel.inc(Status::Dropped); + } else { + self.metrics.cancel.inc_by(Status::Success, removed); } - }; - - // Skip if there is a decoding error - let msg = match msg { - Ok(msg) => msg, - Err(err) => { - trace!(?err, ?peer, "decode failed"); - continue; - } - }; - match msg.payload { - wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await, - wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await, - wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await, - }; - }, - } + } + } + assert_eq!(self.fetcher.len(), self.fetch_timers.len()); + }, + // Handle completed server requests + serve = self.serves.next_completed() => { + let Serve { timer, peer, id, result } = serve; + + // Metrics and logs + match result { + Ok(_) => { + self.metrics.serve.inc(Status::Success); + } + Err(err) => { + debug!(?err, ?peer, ?id, "serve failed"); + timer.cancel(); + self.metrics.serve.inc(Status::Failure); + } + } + + // Send response to peer + self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await; + }, + // Handle network messages + msg = receiver.recv() => { + // Break if the receiver is closed + let (peer, msg) = match msg { + Ok(msg) => msg, + Err(err) => { + error!(?err, "receiver closed"); + return; + } + }; + + // Skip if there is a decoding error + let msg = match msg { + Ok(msg) => msg, + Err(err) => { + trace!(?err, ?peer, "decode failed"); + continue; + } + }; + match msg.payload { + wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await, + wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await, + wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await, + }; + }, } }