diff --git a/broadcast/src/buffered/engine.rs b/broadcast/src/buffered/engine.rs index 357d4f5f49..af8019b110 100644 --- a/broadcast/src/buffered/engine.rs +++ b/broadcast/src/buffered/engine.rs @@ -158,40 +158,37 @@ impl { - let Some(msg) = mail else { - error!("mailbox receiver failed"); - break; - }; - match msg { - Message::Broadcast { - recipients, - message, - responder, - } => { - trace!("mailbox: broadcast"); - self.handle_broadcast(&mut sender, recipients, message, responder) - .await; - } - Message::Subscribe { - peer, - commitment, - digest, - responder, - } => { - trace!("mailbox: subscribe"); - self.handle_subscribe(peer, commitment, digest, responder) - .await; - } - Message::Get { - peer, - commitment, - digest, - responder, - } => { - trace!("mailbox: get"); - self.handle_get(peer, commitment, digest, responder).await; - } + Some(msg) = self.mailbox_receiver.recv() else { + error!("mailbox receiver failed"); + break; + } => match msg { + Message::Broadcast { + recipients, + message, + responder, + } => { + trace!("mailbox: broadcast"); + self.handle_broadcast(&mut sender, recipients, message, responder) + .await; + } + Message::Subscribe { + peer, + commitment, + digest, + responder, + } => { + trace!("mailbox: subscribe"); + self.handle_subscribe(peer, commitment, digest, responder) + .await; + } + Message::Get { + peer, + commitment, + digest, + responder, + } => { + trace!("mailbox: get"); + self.handle_get(peer, commitment, digest, responder).await; } }, // Handle incoming messages diff --git a/collector/src/p2p/engine.rs b/collector/src/p2p/engine.rs index 72de8b4b59..27ebc7ee57 100644 --- a/collector/src/p2p/engine.rs +++ b/collector/src/p2p/engine.rs @@ -132,52 +132,46 @@ where debug!("context shutdown, stopping engine"); }, // Command from the mailbox - command = self.mailbox.recv() => { - if let Some(command) = command { - match command { - Message::Send { - request, - recipients, - responder, - } => { - // Track commitment (if not already tracked) - let commitment = request.commitment(); - let entry = self.tracked.entry(commitment).or_insert_with(|| { - self.outstanding.inc(); - (HashSet::new(), HashSet::new()) - }); - - // Send the request to recipients - match req_tx - .send(recipients, request, self.priority_request) - .await - { - Ok(recipients) => { - entry.0.extend(recipients.iter().cloned()); - responder.send_lossy(Ok(recipients)); - } - Err(err) => { - error!(?err, ?commitment, "failed to send message"); - responder.send_lossy(Err(Error::SendFailed(err.into()))); - } + Some(command) = self.mailbox.recv() else continue => { + match command { + Message::Send { + request, + recipients, + responder, + } => { + // Track commitment (if not already tracked) + let commitment = request.commitment(); + let entry = self.tracked.entry(commitment).or_insert_with(|| { + self.outstanding.inc(); + (HashSet::new(), HashSet::new()) + }); + + // Send the request to recipients + match req_tx + .send(recipients, request, self.priority_request) + .await + { + Ok(recipients) => { + entry.0.extend(recipients.iter().cloned()); + responder.send_lossy(Ok(recipients)); } - } - Message::Cancel { commitment } => { - if self.tracked.remove(&commitment).is_none() { - debug!(?commitment, "ignoring removal of unknown commitment"); + Err(err) => { + error!(?err, ?commitment, "failed to send message"); + responder.send_lossy(Err(Error::SendFailed(err.into()))); } - let _ = self.outstanding.try_set(self.tracked.len()); } } + Message::Cancel { commitment } => { + if self.tracked.remove(&commitment).is_none() { + debug!(?commitment, "ignoring removal of unknown commitment"); + } + let _ = self.outstanding.try_set(self.tracked.len()); + } } }, // Response from a handler - ready = processed.next_completed() => { - // Error handling - let Ok((peer, reply)) = ready else { - continue; - }; + Ok((peer, reply)) = processed.next_completed() else continue => { self.responses.inc(); // Send the response diff --git a/consensus/src/aggregation/engine.rs b/consensus/src/aggregation/engine.rs index 999f155b43..e9fd6eaa07 100644 --- a/consensus/src/aggregation/engine.rs +++ b/consensus/src/aggregation/engine.rs @@ -298,13 +298,10 @@ impl< debug!("shutdown"); }, // Handle refresh epoch deadline - epoch = epoch_updates.recv() => { - // Error handling - let Some(epoch) = epoch else { - error!("epoch subscription failed"); - break; - }; - + Some(epoch) = epoch_updates.recv() else { + error!("epoch subscription failed"); + break; + } => { // Refresh the epoch debug!(current = %self.epoch, new = %epoch, "refresh epoch"); assert!(epoch >= self.epoch); diff --git a/consensus/src/marshal/actor.rs b/consensus/src/marshal/actor.rs index 00a5831fe6..2c9e069e91 100644 --- a/consensus/src/marshal/actor.rs +++ b/consensus/src/marshal/actor.rs @@ -315,11 +315,8 @@ where on_stopped => { debug!("context shutdown, stopping marshal"); }, - // Handle waiter completions first - result = waiters.next_completed() => { - let Ok((commitment, block)) = result else { - continue; // Aborted future - }; + // Handle waiter completions first (aborted futures are skipped) + Ok((commitment, block)) = waiters.next_completed() else continue => { self.notify_subscribers(commitment, &block).await; }, // Handle application acknowledgements next @@ -346,11 +343,10 @@ where } }, // Handle consensus inputs before backfill or resolver traffic - mailbox_message = self.mailbox.recv() => { - let Some(message) = mailbox_message else { - info!("mailbox closed, shutting down"); - break; - }; + Some(message) = self.mailbox.recv() else { + info!("mailbox closed, shutting down"); + break; + } => { match message { Message::GetInfo { identifier, @@ -567,11 +563,10 @@ where } }, // Handle resolver messages last - message = resolver_rx.recv() => { - let Some(message) = message else { - info!("handler closed, shutting down"); - break; - }; + Some(message) = resolver_rx.recv() else { + info!("handler closed, shutting down"); + break; + } => { match message { handler::Message::Produce { key, response } => { match key { diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index cd5341be4a..ea05a5e2cb 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -335,13 +335,10 @@ impl< debug!("shutdown"); }, // Handle refresh epoch deadline - epoch = epoch_updates.recv() => { - // Error handling - let Some(epoch) = epoch else { - error!("epoch subscription failed"); - break; - }; - + Some(epoch) = epoch_updates.recv() else { + error!("epoch subscription failed"); + break; + } => { // Refresh the epoch debug!(current = %self.epoch, new = %epoch, "refresh epoch"); assert!(epoch >= self.epoch); diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index 8ba459ea84..7d9b7bebf8 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -195,61 +195,51 @@ impl< on_stopped => { debug!("context shutdown, stopping batcher"); }, - message = self.mailbox_receiver.recv() => { - match message { - Some(Message::Update { - current: new_current, - leader, - finalized: new_finalized, - active, - }) => { - current = new_current; - finalized = new_finalized; - work.entry(current) - .or_insert_with(|| self.new_round()) - .set_leader(leader); - - // Check if the leader has been active recently - let skip_timeout = self.skip_timeout.get() as usize; - let is_active = - // Ensure we have enough data to judge activity (none of this - // data may be in the last skip_timeout views if we jumped ahead - // to a new view) - work.len() < skip_timeout - // Leader active in at least one recent round - || work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader)); - active.send_lossy(is_active); - - // Setting leader may enable batch verification - updated_view = current; + Some(message) = self.mailbox_receiver.recv() else break => match message { + Message::Update { + current: new_current, + leader, + finalized: new_finalized, + active, + } => { + current = new_current; + finalized = new_finalized; + work.entry(current) + .or_insert_with(|| self.new_round()) + .set_leader(leader); + + // Check if the leader has been active recently + let skip_timeout = self.skip_timeout.get() as usize; + let is_active = + // Ensure we have enough data to judge activity (none of this + // data may be in the last skip_timeout views if we jumped ahead + // to a new view) + work.len() < skip_timeout + // Leader active in at least one recent round + || work.iter().rev().take(skip_timeout).any(|(_, round)| round.is_active(leader)); + active.send_lossy(is_active); + + // Setting leader may enable batch verification + updated_view = current; + } + Message::Constructed(message) => { + // If the view isn't interesting, we can skip + let view = message.view(); + if !interesting(self.activity_timeout, finalized, current, view, false) { + continue; } - Some(Message::Constructed(message)) => { - // If the view isn't interesting, we can skip - let view = message.view(); - if !interesting(self.activity_timeout, finalized, current, view, false) { - continue; - } - // Add the message to the verifier - work.entry(view) - .or_insert_with(|| self.new_round()) - .add_constructed(message) - .await; - self.added.inc(); - updated_view = view; - } - None => { - break; - } + // Add the message to the verifier + work.entry(view) + .or_insert_with(|| self.new_round()) + .add_constructed(message) + .await; + self.added.inc(); + updated_view = view; } }, // Handle certificates from the network - message = certificate_receiver.recv() => { - // If the channel is closed, we should exit - let Ok((sender, message)) = message else { - break; - }; - + Ok((sender, message)) = certificate_receiver.recv() else break => { // If there is a decoding error, block let Ok(message) = message else { warn!(?sender, "blocking peer for decoding error"); @@ -361,12 +351,7 @@ impl< continue; }, // Handle votes from the network - message = vote_receiver.recv() => { - // If the channel is closed, we should exit - let Ok((sender, message)) = message else { - break; - }; - + Ok((sender, message)) = vote_receiver.recv() else break => { // If there is a decoding error, block let Ok(message) = message else { warn!(?sender, "blocking peer for decoding error"); diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index 14d4bf268f..dcd651ee4a 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -130,10 +130,7 @@ impl< _ = &mut resolver_task => { break; }, - mailbox = self.mailbox_receiver.recv() => { - let Some(message) = mailbox else { - break; - }; + Some(message) = self.mailbox_receiver.recv() else break => { match message { MailboxMessage::Certificate(certificate) => { // Certificates from mailbox have no associated request view @@ -146,10 +143,7 @@ impl< } } }, - handler = handler_rx.recv() => { - let Some(message) = handler else { - break; - }; + Some(message) = handler_rx.recv() else break => { self.handle_resolver(message, &mut voter, &mut resolver) .await; }, diff --git a/consensus/src/simplex/actors/voter/actor.rs b/consensus/src/simplex/actors/voter/actor.rs index d3d9bb5e57..37dd1712db 100644 --- a/consensus/src/simplex/actors/voter/actor.rs +++ b/consensus/src/simplex/actors/voter/actor.rs @@ -912,12 +912,8 @@ impl< } }; }, - result = certify_wait => { - // Aborted futures are expected when old views are pruned. - let Ok((round, certified)) = result else { - continue; - }; - + // Aborted futures are expected when old views are pruned + Ok((round, certified)) = certify_wait else continue => { // Handle response to our certification request. view = round.view(); match certified { @@ -944,12 +940,7 @@ impl< } }; }, - mailbox = self.mailbox_receiver.recv() => { - // Extract message - let Some(msg) = mailbox else { - break; - }; - + Some(msg) = self.mailbox_receiver.recv() else break => { // Handle messages from resolver and batcher match msg { Message::Proposal(proposal) => { diff --git a/consensus/src/simplex/mocks/application.rs b/consensus/src/simplex/mocks/application.rs index f62b7e0c00..29a575f83e 100644 --- a/consensus/src/simplex/mocks/application.rs +++ b/consensus/src/simplex/mocks/application.rs @@ -334,11 +334,7 @@ impl Application on_stopped => { debug!("context shutdown, stopping application"); }, - message = self.mailbox.recv() => { - let message = match message { - Some(message) => message, - None => break, - }; + Some(message) = self.mailbox.recv() else break => { match message { Message::Genesis { epoch, response } => { let digest = self.genesis(epoch); @@ -380,9 +376,8 @@ impl Application } } }, - broadcast = self.broadcast.recv() => { + Some((digest, contents)) = self.broadcast.recv() else break => { // Record digest for future use - let (digest, contents) = broadcast.expect("broadcast closed"); seen.insert(digest, contents.clone()); // Check if we have a waiter diff --git a/examples/reshare/src/dkg/actor.rs b/examples/reshare/src/dkg/actor.rs index 6b387591f4..6c2bf6c50c 100644 --- a/examples/reshare/src/dkg/actor.rs +++ b/examples/reshare/src/dkg/actor.rs @@ -432,86 +432,83 @@ where } } }, - mailbox_msg = self.mailbox.recv() => { - let Some(mailbox_msg) = mailbox_msg else { - warn!("dkg actor mailbox closed"); - break 'actor; - }; - match mailbox_msg { - MailboxMessage::Act { response } => { - let outcome = dealer_state.as_ref().and_then(|ds| ds.finalized()); - if outcome.is_some() { - info!("including reshare outcome in proposed block"); - } - if response.send(outcome).is_err() { - warn!("dkg actor could not send response to Act"); - } + Some(mailbox_msg) = self.mailbox.recv() else { + warn!("dkg actor mailbox closed"); + break 'actor; + } => match mailbox_msg { + MailboxMessage::Act { response } => { + let outcome = dealer_state.as_ref().and_then(|ds| ds.finalized()); + if outcome.is_some() { + info!("including reshare outcome in proposed block"); + } + if response.send(outcome).is_err() { + warn!("dkg actor could not send response to Act"); + } + } + MailboxMessage::Finalized { block, response } => { + let bounds = epocher + .containing(block.height) + .expect("block height covered by epoch strategy"); + let block_epoch = bounds.epoch(); + let phase = bounds.phase(); + let relative_height = bounds.relative(); + info!(epoch = %block_epoch, relative_height = %relative_height, "processing finalized block"); + + // Skip blocks from previous epochs (can happen on restart if we + // persisted state but crashed before acknowledging) + if block_epoch < epoch { + response.acknowledge(); + continue; } - MailboxMessage::Finalized { block, response } => { - let bounds = epocher - .containing(block.height) - .expect("block height covered by epoch strategy"); - let block_epoch = bounds.epoch(); - let phase = bounds.phase(); - let relative_height = bounds.relative(); - info!(epoch = %block_epoch, relative_height = %relative_height, "processing finalized block"); - - // Skip blocks from previous epochs (can happen on restart if we - // persisted state but crashed before acknowledging) - if block_epoch < epoch { - response.acknowledge(); - continue; - } - // Process dealer log from block if present - if let Some(log) = block.log { - if let Some((dealer, dealer_log)) = log.check(&round) { - // If we see our dealing outcome in a finalized block, - // make sure to take it, so that we don't post - // it in subsequent blocks - if dealer == self_pk { - if let Some(ref mut ds) = dealer_state { - ds.take_finalized(); - } + // Process dealer log from block if present + if let Some(log) = block.log { + if let Some((dealer, dealer_log)) = log.check(&round) { + // If we see our dealing outcome in a finalized block, + // make sure to take it, so that we don't post + // it in subsequent blocks + if dealer == self_pk { + if let Some(ref mut ds) = dealer_state { + ds.take_finalized(); } - storage.append_log(epoch, dealer, dealer_log).await; } + storage.append_log(epoch, dealer, dealer_log).await; } + } - // In the first half of the epoch, continuously distribute shares - if phase == EpochPhase::Early { - if let Some(ref mut ds) = dealer_state { - Self::distribute_shares( - &self_pk, - &mut storage, - epoch, - ds, - player_state.as_mut(), - &mut round_sender, - ) - .await; - } + // In the first half of the epoch, continuously distribute shares + if phase == EpochPhase::Early { + if let Some(ref mut ds) = dealer_state { + Self::distribute_shares( + &self_pk, + &mut storage, + epoch, + ds, + player_state.as_mut(), + &mut round_sender, + ) + .await; } + } - // At or past the midpoint, finalize dealer if not already done. - if matches!(phase, EpochPhase::Midpoint | EpochPhase::Late) { - if let Some(ref mut ds) = dealer_state { - ds.finalize::(); - } + // At or past the midpoint, finalize dealer if not already done. + if matches!(phase, EpochPhase::Midpoint | EpochPhase::Late) { + if let Some(ref mut ds) = dealer_state { + ds.finalize::(); } + } - // Continue if not the last block in the epoch - if block.height != bounds.last() { - // Acknowledge block processing - response.acknowledge(); - continue; - } + // Continue if not the last block in the epoch + if block.height != bounds.last() { + // Acknowledge block processing + response.acknowledge(); + continue; + } - // Finalize the round before acknowledging - let logs = storage.logs(epoch); - let (success, next_round, next_output, next_share) = if let Some(ps) = - player_state.take() - { + // Finalize the round before acknowledging + let logs = storage.logs(epoch); + let (success, next_round, next_output, next_share) = + if let Some(ps) = player_state.take() { match ps.finalize::(logs, &Sequential) { Ok((new_output, new_share)) => ( true, @@ -537,63 +534,61 @@ where ), } }; - if success { - info!(?epoch, "epoch succeeded"); - self.successful_epochs.inc(); - - // Record reveals - let output = - next_output.as_ref().expect("output exists on success"); - let revealed = output.revealed(); - self.all_reveals.inc_by(revealed.len() as u64); - if revealed.position(&self_pk).is_some() { - self.our_reveals.inc(); - } - } else { - warn!(?epoch, "epoch failed"); - self.failed_epochs.inc(); + if success { + info!(?epoch, "epoch succeeded"); + self.successful_epochs.inc(); + + // Record reveals + let output = next_output.as_ref().expect("output exists on success"); + let revealed = output.revealed(); + self.all_reveals.inc_by(revealed.len() as u64); + if revealed.position(&self_pk).is_some() { + self.our_reveals.inc(); } - storage - .set_epoch( - epoch.next(), - EpochState { - round: next_round, - rng_seed: Summary::random(&mut self.context), - output: next_output.clone(), - share: next_share.clone(), - }, - ) - .await; - - // Acknowledge block processing before callback - response.acknowledge(); - - // Send the callback. - let update = if success { - Update::Success { - epoch, - output: next_output.expect("ceremony output exists"), + } else { + warn!(?epoch, "epoch failed"); + self.failed_epochs.inc(); + } + storage + .set_epoch( + epoch.next(), + EpochState { + round: next_round, + rng_seed: Summary::random(&mut self.context), + output: next_output.clone(), share: next_share.clone(), - } - } else { - Update::Failure { epoch } - }; - - // Exit the engine for this epoch now that the boundary is finalized - orchestrator.exit(epoch).await; - - // If the update is stop, wait forever. - if let PostUpdate::Stop = callback.on_update(update).await { - // Close the mailbox to prevent accepting any new messages - drop(self.mailbox); - // Keep running until killed to keep the orchestrator mailbox alive - info!("DKG complete; waiting for shutdown..."); - futures::future::pending::<()>().await; - break 'actor; + }, + ) + .await; + + // Acknowledge block processing before callback + response.acknowledge(); + + // Send the callback. + let update = if success { + Update::Success { + epoch, + output: next_output.expect("ceremony output exists"), + share: next_share.clone(), } - - break; + } else { + Update::Failure { epoch } + }; + + // Exit the engine for this epoch now that the boundary is finalized + orchestrator.exit(epoch).await; + + // If the update is stop, wait forever. + if let PostUpdate::Stop = callback.on_update(update).await { + // Close the mailbox to prevent accepting any new messages + drop(self.mailbox); + // Keep running until killed to keep the orchestrator mailbox alive + info!("DKG complete; waiting for shutdown..."); + futures::future::pending::<()>().await; + break 'actor; } + + break; } }, } diff --git a/examples/reshare/src/orchestrator/actor.rs b/examples/reshare/src/orchestrator/actor.rs index 0eff397ad2..536ca15728 100644 --- a/examples/reshare/src/orchestrator/actor.rs +++ b/examples/reshare/src/orchestrator/actor.rs @@ -202,13 +202,12 @@ where on_stopped => { debug!("context shutdown, stopping orchestrator"); }, - message = vote_backup.recv() => { + Some((their_epoch, (from, _))) = vote_backup.recv() else { + warn!("vote mux backup channel closed, shutting down orchestrator"); + break; + } => { // If a message is received in an unregistered sub-channel in the vote network, // ensure we have the boundary finalization. - let Some((their_epoch, (from, _))) = message else { - warn!("vote mux backup channel closed, shutting down orchestrator"); - break; - }; let their_epoch = Epoch::new(their_epoch); let Some(our_epoch) = engines.keys().last().copied() else { debug!(%their_epoch, ?from, "received message from unregistered epoch with no known epochs"); @@ -236,52 +235,48 @@ where .hint_finalized(boundary_height, NonEmptyVec::new(from)) .await; }, - transition = self.mailbox.recv() => { - let Some(transition) = transition else { - warn!("mailbox closed, shutting down orchestrator"); - break; - }; - - match transition { - Message::Enter(transition) => { - // If the epoch is already in the map, ignore. - if engines.contains_key(&transition.epoch) { - warn!(epoch = %transition.epoch, "entered existing epoch"); - continue; - } + Some(transition) = self.mailbox.recv() else { + warn!("mailbox closed, shutting down orchestrator"); + break; + } => match transition { + Message::Enter(transition) => { + // If the epoch is already in the map, ignore. + if engines.contains_key(&transition.epoch) { + warn!(epoch = %transition.epoch, "entered existing epoch"); + continue; + } - // Register the new signing scheme with the scheme provider. - let scheme = self.provider.scheme_for_epoch(&transition); - assert!(self.provider.register(transition.epoch, scheme.clone())); + // Register the new signing scheme with the scheme provider. + let scheme = self.provider.scheme_for_epoch(&transition); + assert!(self.provider.register(transition.epoch, scheme.clone())); - // Enter the new epoch. - let engine = self - .enter_epoch( - transition.epoch, - scheme, - &mut vote_mux, - &mut certificate_mux, - &mut resolver_mux, - ) - .await; - engines.insert(transition.epoch, engine); - let _ = self.latest_epoch.try_set(transition.epoch.get()); + // Enter the new epoch. + let engine = self + .enter_epoch( + transition.epoch, + scheme, + &mut vote_mux, + &mut certificate_mux, + &mut resolver_mux, + ) + .await; + engines.insert(transition.epoch, engine); + let _ = self.latest_epoch.try_set(transition.epoch.get()); - info!(epoch = %transition.epoch, "entered epoch"); - } - Message::Exit(epoch) => { - // Remove the engine and abort it. - let Some(engine) = engines.remove(&epoch) else { - warn!(%epoch, "exited non-existent epoch"); - continue; - }; - engine.abort(); + info!(epoch = %transition.epoch, "entered epoch"); + } + Message::Exit(epoch) => { + // Remove the engine and abort it. + let Some(engine) = engines.remove(&epoch) else { + warn!(%epoch, "exited non-existent epoch"); + continue; + }; + engine.abort(); - // Unregister the signing scheme for the epoch. - assert!(self.provider.unregister(&epoch)); + // Unregister the signing scheme for the epoch. + assert!(self.provider.unregister(&epoch)); - info!(%epoch, "exited epoch"); - } + info!(%epoch, "exited epoch"); } }, } diff --git a/examples/sync/src/bin/server.rs b/examples/sync/src/bin/server.rs index 1a71d5584c..f306c050d5 100644 --- a/examples/sync/src/bin/server.rs +++ b/examples/sync/src/bin/server.rs @@ -342,17 +342,15 @@ where } }, - outgoing = response_receiver.recv() => { - if let Some(response) = outgoing { - // We have a response to send to the client. - let response_data = response.encode(); - if let Err(err) = send_frame(&mut sink, response_data, MAX_MESSAGE_SIZE).await { - info!(client_addr = %client_addr, ?err, "send failed (client likely disconnected)"); - state.error_counter.inc(); - return Ok(()); - } - } else { - // Channel closed + Some(response) = response_receiver.recv() else { + // Channel closed + return Ok(()); + } => { + // We have a response to send to the client. + let response_data = response.encode(); + if let Err(err) = send_frame(&mut sink, response_data, MAX_MESSAGE_SIZE).await { + info!(client_addr = %client_addr, ?err, "send failed (client likely disconnected)"); + state.error_counter.inc(); return Ok(()); } }, diff --git a/examples/sync/src/net/io.rs b/examples/sync/src/net/io.rs index 536446ff48..9379519353 100644 --- a/examples/sync/src/net/io.rs +++ b/examples/sync/src/net/io.rs @@ -37,42 +37,34 @@ async fn run_loop( on_stopped => { debug!("context shutdown, terminating I/O task"); }, - outgoing = request_rx.recv() => match outgoing { - Some(Request { - request, - response_tx, - }) => { - let request_id = request.request_id(); - pending_requests.insert(request_id, response_tx); - let data = request.encode(); - if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await { - if let Some(sender) = pending_requests.remove(&request_id) { - let _ = sender.send(Err(Error::Network(e))); - } - return; + Some(Request { + request, + response_tx, + }) = request_rx.recv() else return => { + let request_id = request.request_id(); + pending_requests.insert(request_id, response_tx); + let data = request.encode(); + if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await { + if let Some(sender) = pending_requests.remove(&request_id) { + let _ = sender.send(Err(Error::Network(e))); } + return; } - None => return, }, - incoming = recv_frame(&mut stream, MAX_MESSAGE_SIZE) => { - match incoming { - Ok(response_data) => { - match M::decode(response_data.coalesce()) { - Ok(message) => { - let request_id = message.request_id(); - if let Some(sender) = pending_requests.remove(&request_id) { - let _ = sender.send(Ok(message)); - } - } - Err(_) => { /* ignore */ } - } - } - Err(_e) => { - for (_, sender) in pending_requests.drain() { - let _ = sender.send(Err(Error::RequestChannelClosed)); + Ok(response_data) = recv_frame(&mut stream, MAX_MESSAGE_SIZE) else { + for (_, sender) in pending_requests.drain() { + let _ = sender.send(Err(Error::RequestChannelClosed)); + } + return; + } => { + match M::decode(response_data.coalesce()) { + Ok(message) => { + let request_id = message.request_id(); + if let Some(sender) = pending_requests.remove(&request_id) { + let _ = sender.send(Ok(message)); } - return; } + Err(_) => { /* ignore */ } } }, } diff --git a/macros/impl/src/lib.rs b/macros/impl/src/lib.rs index 4c7956e462..1ecc53ecc6 100644 --- a/macros/impl/src/lib.rs +++ b/macros/impl/src/lib.rs @@ -457,6 +457,14 @@ struct Branch { body: Expr, } +/// Branch for [select_loop!] with optional `else` clause for `Some` patterns. +struct SelectLoopBranch { + pattern: Pat, + future: Expr, + else_body: Option, + body: Expr, +} + impl Parse for SelectInput { fn parse(input: ParseStream<'_>) -> Result { let mut branches = Vec::new(); @@ -524,7 +532,7 @@ struct SelectLoopInput { context: Expr, start_expr: Option, shutdown_expr: Expr, - branches: Vec, + branches: Vec, end_expr: Option, } @@ -581,12 +589,22 @@ impl Parse for SelectLoopInput { let pattern = Pat::parse_single(input)?; input.parse::()?; let future: Expr = input.parse()?; + + // Parse optional else clause: `else expr` + let else_body = if input.peek(Token![else]) { + input.parse::()?; + Some(input.parse::()?) + } else { + None + }; + input.parse::]>()?; let body: Expr = input.parse()?; - branches.push(Branch { + branches.push(SelectLoopBranch { pattern, future, + else_body, body, }); @@ -634,6 +652,30 @@ pub fn select_loop(input: TokenStream) -> TokenStream { end_expr, } = parse_macro_input!(input as SelectLoopInput); + fn is_irrefutable(pat: &Pat) -> bool { + match pat { + Pat::Wild(_) | Pat::Rest(_) => true, + Pat::Ident(i) => i.subpat.as_ref().is_none_or(|(_, p)| is_irrefutable(p)), + Pat::Type(t) => is_irrefutable(&t.pat), + Pat::Tuple(t) => t.elems.iter().all(is_irrefutable), + Pat::Reference(r) => is_irrefutable(&r.pat), + Pat::Paren(p) => is_irrefutable(&p.pat), + _ => false, + } + } + + for b in &branches { + if b.else_body.is_none() && !is_irrefutable(&b.pattern) { + return Error::new_spanned( + &b.pattern, + "refutable patterns require an else clause: \ + `Some(msg) = future else break => { ... }`", + ) + .to_compile_error() + .into(); + } + } + // Convert branches to tokens for the inner select! let branch_tokens: Vec<_> = branches .iter() @@ -641,7 +683,21 @@ pub fn select_loop(input: TokenStream) -> TokenStream { let pattern = &b.pattern; let future = &b.future; let body = &b.body; - quote! { #pattern = #future => #body, } + + // If else clause is present, use let-else to unwrap + b.else_body.as_ref().map_or_else( + // No else: normal pattern binding (already validated as irrefutable) + || quote! { #pattern = #future => #body, }, + // With else: use let-else for refutable patterns + |else_expr| { + quote! { + __select_result = #future => { + let #pattern = __select_result else { #else_expr }; + #body + }, + } + }, + ) }) .collect(); diff --git a/macros/src/lib.rs b/macros/src/lib.rs index f6c92cd125..f42c5c5cef 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -60,9 +60,10 @@ pub use commonware_macros_impl::select; /// context, /// on_start => { /* optional: runs at start of each iteration */ }, /// on_stopped => { cleanup }, -/// pattern = future => block, +/// pattern = future => body, +/// Some(x) = future else break => body, // refutable pattern with else clause /// // ... -/// on_end => { /* optional: runs after non-shutdown arm completes */ }, +/// on_end => { /* optional: runs after select, skipped on shutdown/break/return/continue */ }, /// } /// ``` /// @@ -71,8 +72,8 @@ pub use commonware_macros_impl::select; /// Can use `continue` to skip the select or `break` to exit the loop. /// 2. `on_stopped` (required) - The shutdown handler, executed when shutdown is signaled. /// 3. Select arms - The futures to select over. -/// 4. `on_end` (optional) - Runs after a non-shutdown arm completes. Skipped when shutdown -/// is triggered. Useful for post-processing that should happen after each arm. +/// 4. `on_end` (optional) - Runs after select completes. Skipped on shutdown or if an arm +/// uses `break`/`return`/`continue`. Useful for per-iteration post-processing. /// /// All blocks share the same lexical scope within the loop body. Variables declared in /// `on_start` are visible in the select arms, `on_stopped`, and `on_end`. This allows @@ -81,10 +82,91 @@ pub use commonware_macros_impl::select; /// The `shutdown` variable (the future from `context.stopped()`) is accessible in the /// shutdown block, allowing explicit cleanup such as `drop(shutdown)` before breaking or returning. /// +/// # Refutable Patterns with `else` +/// +/// For refutable patterns (patterns that may not match), use the `else` clause to specify +/// what happens when the pattern fails to match. This uses Rust's let-else syntax internally: +/// +/// ```rust,ignore +/// // Option handling +/// Some(msg) = rx.recv() else break => { handle(msg); } +/// Some(msg) = rx.recv() else return => { handle(msg); } +/// Some(msg) = rx.recv() else continue => { handle(msg); } +/// +/// // Result handling +/// Ok(value) = result_stream.recv() else break => { process(value); } +/// +/// // Enum variants +/// MyEnum::Data(x) = stream.recv() else continue => { use_data(x); } +/// ``` +/// +/// This replaces the common pattern: +/// ```rust,ignore +/// // Before +/// msg = mailbox.recv() => { +/// let Some(msg) = msg else { break }; +/// // use msg +/// } +/// +/// // After +/// Some(msg) = mailbox.recv() else break => { +/// // use msg directly +/// } +/// ``` +/// +/// # Expansion +/// +/// The macro expands to roughly the following code: +/// +/// ```rust,ignore +/// // Input: +/// select_loop! { +/// context, +/// on_start => { start_code }, +/// on_stopped => { shutdown_code }, +/// pattern = future => { body }, +/// Some(msg) = rx.recv() else break => { handle(msg) }, +/// on_end => { end_code }, +/// } +/// +/// // Expands to: +/// { +/// let mut shutdown = context.stopped(); +/// loop { +/// // on_start runs at the beginning of each iteration +/// { start_code } +/// +/// select_biased! { +/// // Shutdown branch (always first due to biased select) +/// _ = &mut shutdown => { +/// { shutdown_code } +/// break; // on_end is NOT executed on shutdown +/// }, +/// +/// // Regular pattern branch +/// pattern = future => { +/// { body } +/// }, +/// +/// // Refutable pattern with else clause (uses let-else) +/// __select_result = rx.recv() => { +/// let Some(msg) = __select_result else { break }; +/// { handle(msg) } +/// }, +/// } +/// +/// // on_end runs after select completes (skipped on shutdown/break/return/continue) +/// { end_code } +/// } +/// } +/// ``` +/// /// # Example /// /// ```rust,ignore -/// async fn run(context: impl commonware_runtime::Spawner) { +/// use commonware_macros::select_loop; +/// +/// async fn run(context: impl commonware_runtime::Spawner, mut receiver: Receiver) { /// let mut counter = 0; /// commonware_macros::select_loop! { /// context, @@ -97,7 +179,8 @@ pub use commonware_macros_impl::select; /// println!("shutting down after {} iterations", counter); /// drop(shutdown); /// }, -/// msg = receiver.recv() => { +/// // Refutable pattern: breaks when channel closes (None) +/// Some(msg) = receiver.recv() else break => { /// println!("received: {:?}", msg); /// }, /// on_end => { diff --git a/macros/tests/select.rs b/macros/tests/select.rs index c32e794950..bb1f52a0fa 100644 --- a/macros/tests/select.rs +++ b/macros/tests/select.rs @@ -404,4 +404,133 @@ mod tests { assert!(did_shutdown); }); } + + #[test] + fn test_select_loop_refutable_pattern_else_variants() { + block_on(async move { + // else break + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_stopped => {}, + Some(msg) = rx.recv() else break => { + received.push(msg); + }, + } + assert_eq!(received, vec![1, 2]); + + // else return + async fn with_return() -> Vec { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(10).unwrap(); + tx.send(20).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_stopped => {}, + Some(msg) = rx.recv() else return received => { + received.push(msg); + }, + } + received.push(999); // Should not be reached + received + } + assert_eq!(with_return().await, vec![10, 20]); + + // else custom block + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(100).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mut closed = false; + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_stopped => {}, + Some(msg) = rx.recv() else { + closed = true; + break; + } => { + received.push(msg); + }, + } + assert_eq!(received, vec![100]); + assert!(closed); + + // else continue + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(Some(1)).unwrap(); + tx.send(None).unwrap(); // Triggers else continue + tx.send(Some(2)).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mut iterations = 0; + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_start => { + iterations += 1; + if iterations > 10 { + break; + } + }, + on_stopped => {}, + Some(Some(value)) = rx.recv() else continue => { + received.push(value); + }, + } + assert_eq!(received, vec![1, 2]); + + // nested pattern + let (tx, mut rx) = mpsc::unbounded_channel::>(); + tx.send(Ok(1)).unwrap(); + tx.send(Err("skip")).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_stopped => {}, + Some(Ok(value)) = rx.recv() else break => { + received.push(value); + }, + } + assert_eq!(received, vec![1]); + }); + } + + #[test] + fn test_select_loop_backward_compatibility() { + // Verify existing patterns still work exactly as before + block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + + let mut received = Vec::new(); + let mock_context = MockSignaler; + select_loop! { + mock_context, + on_stopped => {}, + msg = rx.recv() => match msg { + Some(v) => received.push(v), + None => break, + }, + } + assert_eq!(received, vec![1, 2]); + }); + } } diff --git a/p2p/src/authenticated/discovery/actors/peer/actor.rs b/p2p/src/authenticated/discovery/actors/peer/actor.rs index 3f1bb055dc..0793584e6a 100644 --- a/p2p/src/authenticated/discovery/actors/peer/actor.rs +++ b/p2p/src/authenticated/discovery/actors/peer/actor.rs @@ -162,11 +162,9 @@ impl Actor { // Reset ticker deadline = context.current() + self.gossip_bit_vec_frequency; }, - msg_control = self.control.recv() => { - let msg = match msg_control { - Some(msg_control) => msg_control, - None => return Err(Error::PeerDisconnected), - }; + Some(msg) = self.control.recv() else { + return Err(Error::PeerDisconnected); + } => { let (metric, payload) = match msg { Message::BitVec(bit_vec) => ( metrics::Message::new_bit_vec(&peer), diff --git a/p2p/src/authenticated/discovery/actors/router/actor.rs b/p2p/src/authenticated/discovery/actors/router/actor.rs index f59f87af87..7e0a2776ed 100644 --- a/p2p/src/authenticated/discovery/actors/router/actor.rs +++ b/p2p/src/authenticated/discovery/actors/router/actor.rs @@ -97,12 +97,10 @@ impl Actor { on_stopped => { debug!("context shutdown, stopping router"); }, - msg = self.control.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping router"); - break; - }; - + Some(msg) = self.control.recv() else { + debug!("mailbox closed, stopping router"); + break; + } => { match msg { Message::Ready { peer, diff --git a/p2p/src/authenticated/discovery/actors/spawner/actor.rs b/p2p/src/authenticated/discovery/actors/spawner/actor.rs index 6eaf0eeb90..bd9d29826f 100644 --- a/p2p/src/authenticated/discovery/actors/spawner/actor.rs +++ b/p2p/src/authenticated/discovery/actors/spawner/actor.rs @@ -108,12 +108,10 @@ impl { debug!("context shutdown, stopping spawner"); }, - msg = self.receiver.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping spawner"); - break; - }; - + Some(msg) = self.receiver.recv() else { + debug!("mailbox closed, stopping spawner"); + break; + } => { match msg { Message::Spawn { peer, diff --git a/p2p/src/authenticated/discovery/actors/tracker/actor.rs b/p2p/src/authenticated/discovery/actors/tracker/actor.rs index 1b96ad66f0..d1a34d85b6 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/actor.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/actor.rs @@ -137,11 +137,10 @@ impl Actor { _ = self.directory.wait_for_unblock() => { self.directory.unblock_expired(); }, - msg = self.receiver.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping tracker"); - break; - }; + Some(msg) = self.receiver.recv() else { + debug!("mailbox closed, stopping tracker"); + break; + } => { self.handle_msg(msg).await; }, } diff --git a/p2p/src/authenticated/discovery/mod.rs b/p2p/src/authenticated/discovery/mod.rs index 871de208d0..264efa9ec0 100644 --- a/p2p/src/authenticated/discovery/mod.rs +++ b/p2p/src/authenticated/discovery/mod.rs @@ -904,12 +904,7 @@ mod tests { select_loop! { context, on_stopped => {}, - result = receiver.recv() => { - if result.is_err() { - // Channel closed due to shutdown - break; - } - }, + Ok(_) = receiver.recv() else break => {}, } } }); diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index b208d454c5..b71f08f9c3 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -183,11 +183,10 @@ impl Actor { debug!("context shutdown, stopping listener"); }, - update = self.mailbox.recv() => { - let Some(registered_ips) = update else { - debug!("mailbox closed"); - break; - }; + Some(registered_ips) = self.mailbox.recv() else { + debug!("mailbox closed"); + break; + } => { self.registered_ips = registered_ips; }, listener = listener.accept() => { diff --git a/p2p/src/authenticated/lookup/actors/peer/actor.rs b/p2p/src/authenticated/lookup/actors/peer/actor.rs index b1e55b58c6..451a9d4f38 100644 --- a/p2p/src/authenticated/lookup/actors/peer/actor.rs +++ b/p2p/src/authenticated/lookup/actors/peer/actor.rs @@ -148,14 +148,10 @@ impl Actor { // Reset ticker deadline = context.current() + self.ping_frequency; }, - msg_control = self.control.recv() => { - let msg = match msg_control { - Some(msg_control) => msg_control, - None => return Err(Error::PeerDisconnected), - }; - match msg { - Message::Kill => return Err(Error::PeerKilled(peer.to_string())), - } + Some(msg) = self.control.recv() else { + return Err(Error::PeerDisconnected); + } => match msg { + Message::Kill => return Err(Error::PeerKilled(peer.to_string())), }, msg_high = self.high.recv() => { // Data is already pre-encoded, just forward to stream diff --git a/p2p/src/authenticated/lookup/actors/router/actor.rs b/p2p/src/authenticated/lookup/actors/router/actor.rs index f76ae52c5e..93ba3e080f 100644 --- a/p2p/src/authenticated/lookup/actors/router/actor.rs +++ b/p2p/src/authenticated/lookup/actors/router/actor.rs @@ -97,11 +97,10 @@ impl Actor { on_stopped => { debug!("context shutdown, stopping router"); }, - msg = self.control.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping router"); - break; - }; + Some(msg) = self.control.recv() else { + debug!("mailbox closed, stopping router"); + break; + } => { match msg { Message::Ready { peer, diff --git a/p2p/src/authenticated/lookup/actors/spawner/actor.rs b/p2p/src/authenticated/lookup/actors/spawner/actor.rs index c2d5ecd6c1..99f2cbe9c9 100644 --- a/p2p/src/authenticated/lookup/actors/spawner/actor.rs +++ b/p2p/src/authenticated/lookup/actors/spawner/actor.rs @@ -96,11 +96,10 @@ impl { debug!("context shutdown, stopping spawner"); }, - msg = self.receiver.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping spawner"); - break; - }; + Some(msg) = self.receiver.recv() else { + debug!("mailbox closed, stopping spawner"); + break; + } => { match msg { Message::Spawn { peer, diff --git a/p2p/src/authenticated/lookup/actors/tracker/actor.rs b/p2p/src/authenticated/lookup/actors/tracker/actor.rs index 19089a5821..9ce8dccf21 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/actor.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/actor.rs @@ -122,11 +122,10 @@ impl Actor { .await; } }, - msg = self.receiver.recv() => { - let Some(msg) = msg else { - debug!("mailbox closed, stopping tracker"); - break; - }; + Some(msg) = self.receiver.recv() else { + debug!("mailbox closed, stopping tracker"); + break; + } => { self.handle_msg(msg).await; }, } diff --git a/p2p/src/simulated/network.rs b/p2p/src/simulated/network.rs index e2ad33404d..aa496d692a 100644 --- a/p2p/src/simulated/network.rs +++ b/p2p/src/simulated/network.rs @@ -685,34 +685,26 @@ impl Network } async fn run(mut self) { - loop { - let tick = match self.transmitter.next() { - Some(when) => Either::Left(self.context.sleep_until(when)), - None => Either::Right(future::pending()), - }; - select! { - _ = tick => { - let now = self.context.current(); - let completions = self.transmitter.advance(now); - self.process_completions(completions); - }, - message = self.ingress.recv() => { - // If ingress is closed, exit - let message = match message { - Some(message) => message, - None => break, - }; - self.handle_ingress(message).await; - }, - task = self.receiver.recv() => { - // If receiver is closed, exit - let task = match task { - Some(task) => task, - None => break, - }; - self.handle_task(task); - }, - } + select_loop! { + self.context, + on_start => { + let tick = match self.transmitter.next() { + Some(when) => Either::Left(self.context.sleep_until(when)), + None => Either::Right(future::pending()), + }; + }, + on_stopped => {}, + _ = tick => { + let now = self.context.current(); + let completions = self.transmitter.advance(now); + self.process_completions(completions); + }, + Some(message) = self.ingress.recv() else break => { + self.handle_ingress(message).await; + }, + Some(task) = self.receiver.recv() else break => { + self.handle_task(task); + }, } } } @@ -1093,7 +1085,11 @@ impl Peer

{ ) -> Self { // The control is used to register channels. // There is exactly one mailbox created for each channel that the peer is registered for. - let (control_sender, mut control_receiver) = mpsc::unbounded_channel(); + #[allow(clippy::type_complexity)] + let (control_sender, mut control_receiver): ( + mpsc::UnboundedSender<(Channel, Handle<()>, oneshot::Sender>)>, + _, + ) = mpsc::unbounded_channel(); // Whenever a message is received from a peer, it is placed in the inbox. // The router polls the inbox and forwards the message to the appropriate mailbox. @@ -1109,17 +1105,7 @@ impl Peer

{ context, on_stopped => {}, // Listen for control messages, which are used to register channels - control = control_receiver.recv() => { - // If control is closed, exit - let (channel, sender, result_tx): ( - Channel, - Handle<()>, - oneshot::Sender>, - ) = match control { - Some(control) => control, - None => break, - }; - + Some((channel, sender, result_tx)) = control_receiver.recv() else break => { // Register channel let (receiver_tx, receiver_rx) = mpsc::unbounded_channel(); if let Some((_, existing_sender)) = @@ -1132,13 +1118,7 @@ impl Peer

{ }, // Listen for messages from the inbox, which are forwarded to the appropriate mailbox - inbox = inbox_receiver.recv() => { - // If inbox is closed, exit - let (channel, message) = match inbox { - Some(message) => message, - None => break, - }; - + Some((channel, message)) = inbox_receiver.recv() else break => { // Send message to mailbox match mailboxes.get_mut(&channel) { Some((receiver_tx, _)) => { diff --git a/p2p/src/utils/mux.rs b/p2p/src/utils/mux.rs index 9e013b7c76..febee10f4f 100644 --- a/p2p/src/utils/mux.rs +++ b/p2p/src/utils/mux.rs @@ -117,28 +117,25 @@ impl Muxer { }, // Prefer control messages because network messages will // already block when full (providing backpressure). - control = self.control_rx.recv() => { - match control { - Some(Control::Register { subchannel, sender }) => { - // If the subchannel is already registered, drop the sender. - if self.routes.contains_key(&subchannel) { - continue; - } - - // Otherwise, create a new subchannel and send the receiver to the caller. - let (tx, rx) = mpsc::channel(self.mailbox_size); - self.routes.insert(subchannel, tx); - let _ = sender.send(rx); - } - Some(Control::Deregister { subchannel }) => { - // Remove the route. - self.routes.remove(&subchannel); - } - None => { - // If the control channel is closed, we can shut down since there must - // be no more registrations, and all receivers must have been dropped. - return Ok(()); + Some(control) = self.control_rx.recv() else { + // If the control channel is closed, we can shut down since there must + // be no more registrations, and all receivers must have been dropped. + return Ok(()); + } => match control { + Control::Register { subchannel, sender } => { + // If the subchannel is already registered, drop the sender. + if self.routes.contains_key(&subchannel) { + continue; } + + // Otherwise, create a new subchannel and send the receiver to the caller. + let (tx, rx) = mpsc::channel(self.mailbox_size); + self.routes.insert(subchannel, tx); + let _ = sender.send(rx); + } + Control::Deregister { subchannel } => { + // Remove the route. + self.routes.remove(&subchannel); } }, // Process network messages. diff --git a/resolver/src/p2p/engine.rs b/resolver/src/p2p/engine.rs index e2f5a7a553..8e7c78023e 100644 --- a/resolver/src/p2p/engine.rs +++ b/resolver/src/p2p/engine.rs @@ -191,12 +191,10 @@ impl< self.serves.cancel_all(); }, // Handle peer set updates - peer_set_update = peer_set_subscription.recv() => { - let Some((id, _, all)) = peer_set_update else { - debug!("peer set subscription closed"); - return; - }; - + Some((id, _, all)) = peer_set_subscription.recv() else { + debug!("peer set subscription closed"); + return; + } => { // Instead of directing our requests to exclusively the latest set (which may still be syncing, we // reconcile with all tracked peers). if self.last_peer_set_id < Some(id) { @@ -217,11 +215,10 @@ impl< self.fetcher.fetch(&mut sender).await; }, // Handle mailbox messages - msg = self.mailbox.recv() => { - let Some(msg) = msg else { - error!("mailbox closed"); - return; - }; + Some(msg) = self.mailbox.recv() else { + error!("mailbox closed"); + return; + } => { match msg { Message::Fetch(requests) => { for FetchRequest { key, targets } in requests {