diff --git a/broadcast/src/buffered/engine.rs b/broadcast/src/buffered/engine.rs index a37d877860..176d480281 100644 --- a/broadcast/src/buffered/engine.rs +++ b/broadcast/src/buffered/engine.rs @@ -168,15 +168,31 @@ impl { + Message::Broadcast { + recipients, + message, + responder, + } => { trace!("mailbox: broadcast"); - self.handle_broadcast(&mut sender, recipients, message, responder).await; + self.handle_broadcast(&mut sender, recipients, message, responder) + .await; } - Message::Subscribe{ peer, commitment, digest, responder } => { + Message::Subscribe { + peer, + commitment, + digest, + responder, + } => { trace!("mailbox: subscribe"); - self.handle_subscribe(peer, commitment, digest, responder).await; + self.handle_subscribe(peer, commitment, digest, responder) + .await; } - Message::Get{ peer, commitment, digest, responder } => { + Message::Get { + peer, + commitment, + digest, + responder, + } => { trace!("mailbox: get"); self.handle_get(peer, commitment, digest, responder).await; } @@ -204,7 +220,10 @@ impl { if let Some(command) = command { match command { - Message::Send { request, recipients, responder } => { + Message::Send { + request, + recipients, + responder, + } => { // Track commitment (if not already tracked) let commitment = request.commitment(); let entry = self.tracked.entry(commitment).or_insert_with(|| { @@ -145,11 +149,10 @@ where }); // Send the request to recipients - match req_tx.send( - recipients, - request, - self.priority_request - ).await { + match req_tx + .send(recipients, request, self.priority_request) + .await + { Ok(recipients) => { entry.0.extend(recipients.iter().cloned()); responder.send_lossy(Ok(recipients)); @@ -159,7 +162,7 @@ where responder.send_lossy(Err(Error::SendFailed(err.into()))); } } - }, + } Message::Cancel { commitment } => { if self.tracked.remove(&commitment).is_none() { debug!(?commitment, "ignoring removal of unknown commitment"); @@ -179,11 +182,9 @@ where self.responses.inc(); // Send the response - let _ = res_tx.send( - Recipients::One(peer), - reply, - self.priority_response - ).await; + let _ = res_tx + .send(Recipients::One(peer), reply, self.priority_response) + .await; }, // Request from an originator @@ -210,9 +211,7 @@ where // Handle the request let (tx, rx) = oneshot::channel(); self.handler.process(peer.clone(), msg, tx).await; - processed.push(async move { - Ok((peer, rx.await?)) - }); + processed.push(async move { Ok((peer, rx.await?)) }); }, // Response from a handler diff --git a/collector/src/p2p/mod.rs b/collector/src/p2p/mod.rs index 7ae144fc52..6cd1e588f1 100644 --- a/collector/src/p2p/mod.rs +++ b/collector/src/p2p/mod.rs @@ -309,7 +309,7 @@ mod tests { }, _ = context.sleep(Duration::from_millis(5_000)) => { // Expected: no events - } + }, } }); } @@ -476,7 +476,7 @@ mod tests { }, _ = context.sleep(Duration::from_millis(5_000)) => { // Expected: no more responses - } + }, } }); } @@ -629,7 +629,7 @@ mod tests { }, _ = context.sleep(Duration::from_millis(1_000)) => { // Expected: no events - } + }, } }); } @@ -673,7 +673,7 @@ mod tests { }, _ = context.sleep(Duration::from_millis(1_000)) => { // Expected: no events - } + }, } }); } @@ -851,7 +851,7 @@ mod tests { }, _ = context.sleep(Duration::from_millis(1_000)) => { // Expected: no more events - } + }, } }); } diff --git a/consensus/fuzz/src/disrupter.rs b/consensus/fuzz/src/disrupter.rs index 7f96e322f7..e66a6561a0 100644 --- a/consensus/fuzz/src/disrupter.rs +++ b/consensus/fuzz/src/disrupter.rs @@ -195,7 +195,7 @@ where // We ignore resolver messages _ = self.context.sleep(TIMEOUT) => { self.send_random(&mut vote_sender).await; - } + }, } } } diff --git a/consensus/src/aggregation/engine.rs b/consensus/src/aggregation/engine.rs index dcae4df0b5..be98db113e 100644 --- a/consensus/src/aggregation/engine.rs +++ b/consensus/src/aggregation/engine.rs @@ -311,29 +311,34 @@ impl< self.epoch = epoch; // Update the tip manager - let scheme = self.scheme(self.epoch) + let scheme = self + .scheme(self.epoch) .expect("current epoch scheme must exist"); self.safe_tip.reconcile(scheme.participants()); // Update data structures by purging old epochs let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0); - self.pending.iter_mut().for_each(|(_, pending)| { - match pending { + self.pending + .iter_mut() + .for_each(|(_, pending)| match pending { Pending::Unverified(acks) => { acks.retain(|epoch, _| *epoch >= min_epoch); } Pending::Verified(_, acks) => { acks.retain(|epoch, _| *epoch >= min_epoch); } - } - }); + }); continue; }, // Sign a new ack request = self.digest_requests.next_completed() => { - let DigestRequest { height, result, timer } = request; + let DigestRequest { + height, + result, + timer, + } = request; drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. match result { Err(err) => { @@ -404,7 +409,10 @@ impl< // Rebroadcast _ = rebroadcast => { // Get the next height to rebroadcast - let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline"); + let (height, _) = self + .rebroadcast_deadlines + .pop() + .expect("no rebroadcast deadline"); trace!(%height, "rebroadcasting"); if let Err(err) = self.handle_rebroadcast(height, &mut sender).await { warn!(?err, %height, "rebroadcast failed"); diff --git a/consensus/src/marshal/actor.rs b/consensus/src/marshal/actor.rs index a42aea87bc..062a07aa2a 100644 --- a/consensus/src/marshal/actor.rs +++ b/consensus/src/marshal/actor.rs @@ -327,7 +327,9 @@ where }, // Handle application acknowledgements next ack = &mut self.pending_ack => { - let PendingAck { height, commitment, .. } = self.pending_ack.take().expect("ack state must be present"); + let PendingAck { + height, commitment, .. + } = self.pending_ack.take().expect("ack state must be present"); match ack { Ok(()) => { @@ -353,7 +355,10 @@ where break; }; match message { - Message::GetInfo { identifier, response } => { + Message::GetInfo { + identifier, + response, + } => { let info = match identifier { // TODO: Instead of pulling out the entire block, determine the // height directly from the archive by mapping the commitment to @@ -372,15 +377,13 @@ where .ok() .flatten() .map(|f| (height, f.proposal.payload)), - BlockID::Latest => self - .get_latest() - .await - .map(|(h, c, _)| (h, c)), + BlockID::Latest => self.get_latest().await.map(|(h, c, _)| (h, c)), }; response.send_lossy(info); } Message::Proposed { round, block } => { - self.cache_verified(round, block.commitment(), block.clone()).await; + self.cache_verified(round, block.commitment(), block.clone()) + .await; let _peers = buffer.broadcast(Recipients::All, block).await; } Message::Verified { round, block } => { @@ -391,7 +394,9 @@ where let commitment = notarization.proposal.payload; // Store notarization by view - self.cache.put_notarization(round, commitment, notarization.clone()).await; + self.cache + .put_notarization(round, commitment, notarization.clone()) + .await; // Search for block locally, otherwise fetch it remotely if let Some(block) = self.find_block(&mut buffer, commitment).await { @@ -406,7 +411,9 @@ where // Cache finalization by round let round = finalization.round(); let commitment = finalization.proposal.payload; - self.cache.put_finalization(round, commitment, finalization.clone()).await; + self.cache + .put_finalization(round, commitment, finalization.clone()) + .await; // Search for block locally, otherwise fetch it remotely if let Some(block) = self.find_block(&mut buffer, commitment).await { @@ -429,25 +436,28 @@ where resolver.fetch(Request::::Block(commitment)).await; } } - Message::GetBlock { identifier, response } => { - match identifier { - BlockID::Commitment(commitment) => { - let result = self.find_block(&mut buffer, commitment).await; - response.send_lossy(result); - } - BlockID::Height(height) => { - let result = self.get_finalized_block(height).await; - response.send_lossy(result); - } - BlockID::Latest => { - let block = match self.get_latest().await { - Some((_, commitment, _)) => self.find_block(&mut buffer, commitment).await, - None => None, - }; - response.send_lossy(block); - } + Message::GetBlock { + identifier, + response, + } => match identifier { + BlockID::Commitment(commitment) => { + let result = self.find_block(&mut buffer, commitment).await; + response.send_lossy(result); } - } + BlockID::Height(height) => { + let result = self.get_finalized_block(height).await; + response.send_lossy(result); + } + BlockID::Latest => { + let block = match self.get_latest().await { + Some((_, commitment, _)) => { + self.find_block(&mut buffer, commitment).await + } + None => None, + }; + response.send_lossy(block); + } + }, Message::GetFinalization { height, response } => { let finalization = self.get_finalization_by_height(height).await; response.send_lossy(finalization); @@ -467,7 +477,11 @@ where let request = Request::::Finalized { height }; resolver.fetch_targeted(request, targets).await; } - Message::Subscribe { round, commitment, response } => { + Message::Subscribe { + round, + commitment, + response, + } => { // Check for block locally if let Some(block) = self.find_block(&mut buffer, commitment).await { response.send_lossy(block); @@ -566,7 +580,8 @@ where match key { Request::Block(commitment) => { // Check for block locally - let Some(block) = self.find_block(&mut buffer, commitment).await else { + let Some(block) = self.find_block(&mut buffer, commitment).await + else { debug!(?commitment, "block missing on request"); continue; }; @@ -574,7 +589,9 @@ where } Request::Finalized { height } => { // Get finalization - let Some(finalization) = self.get_finalization_by_height(height).await else { + let Some(finalization) = + self.get_finalization_by_height(height).await + else { debug!(%height, "finalization missing on request"); continue; }; @@ -590,26 +607,34 @@ where } Request::Notarized { round } => { // Get notarization - let Some(notarization) = self.cache.get_notarization(round).await else { + let Some(notarization) = self.cache.get_notarization(round).await + else { debug!(?round, "notarization missing on request"); continue; }; // Get block let commitment = notarization.proposal.payload; - let Some(block) = self.find_block(&mut buffer, commitment).await else { + let Some(block) = self.find_block(&mut buffer, commitment).await + else { debug!(?commitment, "block missing on request"); continue; }; response.send_lossy((notarization, block).encode()); } } - }, - handler::Message::Deliver { key, value, response } => { + } + handler::Message::Deliver { + key, + value, + response, + } => { match key { Request::Block(commitment) => { // Parse block - let Ok(block) = B::decode_cfg(value.as_ref(), &self.block_codec_config) else { + let Ok(block) = + B::decode_cfg(value.as_ref(), &self.block_codec_config) + else { response.send_lossy(false); continue; }; @@ -622,7 +647,8 @@ where // Persist the block, also persisting the finalization if we have it let height = block.height(); - let finalization = self.cache.get_finalization_for(commitment).await; + let finalization = + self.cache.get_finalization_for(commitment).await; self.finalize( height, commitment, @@ -635,13 +661,15 @@ where .await; debug!(?commitment, %height, "received block"); response.send_lossy(true); - }, + } Request::Finalized { height } => { let Some(bounds) = self.epocher.containing(height) else { response.send_lossy(false); continue; }; - let Some(scheme) = self.get_scheme_certificate_verifier(bounds.epoch()) else { + let Some(scheme) = + self.get_scheme_certificate_verifier(bounds.epoch()) + else { response.send_lossy(false); continue; }; @@ -650,7 +678,10 @@ where let Ok((finalization, block)) = <(Finalization, B)>::decode_cfg( value, - &(scheme.certificate_codec_config(), self.block_codec_config.clone()), + &( + scheme.certificate_codec_config(), + self.block_codec_config.clone(), + ), ) else { response.send_lossy(false); @@ -660,7 +691,11 @@ where // Validation if block.height() != height || finalization.proposal.payload != block.commitment() - || !finalization.verify(&mut self.context, &scheme, &self.strategy) + || !finalization.verify( + &mut self.context, + &scheme, + &self.strategy, + ) { response.send_lossy(false); continue; @@ -679,9 +714,11 @@ where &mut resolver, ) .await; - }, + } Request::Notarized { round } => { - let Some(scheme) = self.get_scheme_certificate_verifier(round.epoch()) else { + let Some(scheme) = + self.get_scheme_certificate_verifier(round.epoch()) + else { response.send_lossy(false); continue; }; @@ -690,7 +727,10 @@ where let Ok((notarization, block)) = <(Notarization, B)>::decode_cfg( value, - &(scheme.certificate_codec_config(), self.block_codec_config.clone()), + &( + scheme.certificate_codec_config(), + self.block_codec_config.clone(), + ), ) else { response.send_lossy(false); @@ -700,7 +740,11 @@ where // Validation if notarization.round() != round || notarization.proposal.payload != block.commitment() - || !notarization.verify(&mut self.context, &scheme, &self.strategy) + || !notarization.verify( + &mut self.context, + &scheme, + &self.strategy, + ) { response.send_lossy(false); continue; @@ -718,7 +762,9 @@ where // resolve the request for the notarization before we resolve // the request for the block. let height = block.height(); - if let Some(finalization) = self.cache.get_finalization_for(commitment).await { + if let Some(finalization) = + self.cache.get_finalization_for(commitment).await + { self.finalize( height, commitment, @@ -733,10 +779,12 @@ where // Cache the notarization and block self.cache_block(round, commitment, block).await; - self.cache.put_notarization(round, commitment, notarization).await; - }, + self.cache + .put_notarization(round, commitment, notarization) + .await; + } } - }, + } } }, } diff --git a/consensus/src/marshal/mod.rs b/consensus/src/marshal/mod.rs index f77deabdc4..6df7f79618 100644 --- a/consensus/src/marshal/mod.rs +++ b/consensus/src/marshal/mod.rs @@ -2448,10 +2448,7 @@ mod tests { // Use select with timeout to detect never-resolving receiver select! { result = certify_a => { - assert!( - result.unwrap(), - "Block A certification should succeed" - ); + assert!(result.unwrap(), "Block A certification should succeed"); }, _ = context.sleep(Duration::from_secs(5)) => { panic!("Block A certification timed out"); diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index 495050f38f..3768c91d3c 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -374,7 +374,10 @@ impl< }; // Propose the chunk - if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await { + if let Err(err) = self + .propose(context.clone(), payload, &mut node_sender) + .await + { warn!(?err, ?context, "propose new failed"); continue; } @@ -414,7 +417,12 @@ impl< // Handle the parent certificate if let Some(parent_chunk) = result { let parent = node.parent.as_ref().unwrap(); - self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await; + self.handle_certificate( + &parent_chunk, + parent.epoch, + parent.certificate.clone(), + ) + .await; } // Process the node @@ -459,7 +467,12 @@ impl< // Handle completed verification futures. verify = self.pending_verifies.next_completed() => { - let Verify { timer, context, payload, result } = verify; + let Verify { + timer, + context, + payload, + result, + } = verify; drop(timer); // Record metric. Explicitly reference timer to avoid lint warning. match result { Err(err) => { @@ -473,10 +486,13 @@ impl< Ok(true) => { debug!(?context, "verified"); self.metrics.verify.inc(Status::Success); - if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await { + if let Err(err) = self + .handle_app_verified(&context, &payload, &mut ack_sender) + .await + { debug!(?err, ?context, ?payload, "verified handle failed"); } - }, + } } }, } diff --git a/consensus/src/ordered_broadcast/mod.rs b/consensus/src/ordered_broadcast/mod.rs index c0b4d933c2..0208370bf9 100644 --- a/consensus/src/ordered_broadcast/mod.rs +++ b/consensus/src/ordered_broadcast/mod.rs @@ -447,8 +447,8 @@ mod tests { ); select! { - _ = crash => { false }, - _ = run => { true }, + _ = crash => false, + _ = run => true, } }; diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index d6bcb5bbf8..44a49f017c 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -206,8 +206,7 @@ impl< }) => { current = new_current; finalized = new_finalized; - work - .entry(current) + work.entry(current) .or_insert_with(|| self.new_round()) .set_leader(leader); @@ -228,13 +227,7 @@ impl< Some(Message::Constructed(message)) => { // If the view isn't interesting, we can skip let view = message.view(); - if !interesting( - self.activity_timeout, - finalized, - current, - view, - false, - ) { + if !interesting(self.activity_timeout, finalized, current, view, false) { continue; } @@ -301,19 +294,14 @@ impl< } // Verify the certificate - if !notarization.verify( - &mut self.context, - &self.scheme, - &self.strategy, - ) { + if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) { warn!(?sender, %view, "blocking peer for invalid notarization"); self.blocker.block(sender).await; continue; } // Store and forward to voter - work - .entry(view) + work.entry(view) .or_insert_with(|| self.new_round()) .set_notarization(notarization.clone()); voter @@ -339,8 +327,7 @@ impl< } // Store and forward to voter - work - .entry(view) + work.entry(view) .or_insert_with(|| self.new_round()) .set_nullification(nullification.clone()); voter @@ -355,19 +342,14 @@ impl< } // Verify the certificate - if !finalization.verify( - &mut self.context, - &self.scheme, - &self.strategy, - ) { + if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) { warn!(?sender, %view, "blocking peer for invalid finalization"); self.blocker.block(sender).await; continue; } // Store and forward to voter - work - .entry(view) + work.entry(view) .or_insert_with(|| self.new_round()) .set_finalization(finalization.clone()); voter @@ -410,13 +392,7 @@ impl< // If the view isn't interesting, we can skip let view = message.view(); - if !interesting( - self.activity_timeout, - finalized, - current, - view, - false, - ) { + if !interesting(self.activity_timeout, finalized, current, view, false) { continue; } @@ -426,15 +402,16 @@ impl< .entry(view) .or_insert_with(|| self.new_round()) .add_network(sender, message) - .await { - self.added.inc(); + .await + { + self.added.inc(); - // Update per-peer latest vote metric (only if higher than current) - let _ = self - .latest_vote - .get_or_create(&peer) - .try_set_max(view.get()); - } + // Update per-peer latest vote metric (only if higher than current) + let _ = self + .latest_vote + .get_or_create(&peer) + .try_set_max(view.get()); + } updated_view = view; }, on_end => { diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index 4339c4cf23..51bd7ae580 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -1417,16 +1417,14 @@ mod tests { // Should NOT receive any certificate for the finalized view select! { - msg = voter_receiver.next() => { - match msg { - Some(voter::Message::Proposal(_)) => {}, - Some(voter::Message::Verified(cert, _)) if cert.view() == view2 => { - panic!("should not receive any certificate for the finalized view"); - }, - _ => {}, + msg = voter_receiver.next() => match msg { + Some(voter::Message::Proposal(_)) => {} + Some(voter::Message::Verified(cert, _)) if cert.view() == view2 => { + panic!("should not receive any certificate for the finalized view"); } + _ => {} }, - _ = context.sleep(Duration::from_millis(200)) => { }, + _ = context.sleep(Duration::from_millis(200)) => {}, }; }); } diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index be6117da3b..8f5e871011 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -137,7 +137,9 @@ impl< self.state.handle(certificate, None, &mut resolver).await; } MailboxMessage::Certified { view, success } => { - self.state.handle_certified(view, success, &mut resolver).await; + self.state + .handle_certified(view, success, &mut resolver) + .await; } } }, @@ -145,7 +147,8 @@ impl< let Some(message) = handler else { break; }; - self.handle_resolver(message, &mut voter, &mut resolver).await; + self.handle_resolver(message, &mut voter, &mut resolver) + .await; }, } } diff --git a/consensus/src/simplex/actors/voter/actor.rs b/consensus/src/simplex/actors/voter/actor.rs index be20183adb..2bb7086585 100644 --- a/consensus/src/simplex/actors/voter/actor.rs +++ b/consensus/src/simplex/actors/voter/actor.rs @@ -841,11 +841,17 @@ impl< debug!("context shutdown, stopping voter"); // Sync and drop journal - self.journal.take().unwrap().sync_all().await.expect("unable to sync journal"); + self.journal + .take() + .unwrap() + .sync_all() + .await + .expect("unable to sync journal"); }, _ = self.context.sleep_until(timeout) => { // Trigger the timeout - self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender).await; + self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender) + .await; view = self.state.current_view(); }, (context, proposed) = propose_wait => { @@ -870,11 +876,7 @@ impl< } // Construct proposal - let proposal = Proposal::new( - context.round, - context.parent.0, - proposed, - ); + let proposal = Proposal::new(context.round, context.parent.0, proposed); if !self.state.proposed(proposal) { warn!(round = ?context.round, "dropped our proposal"); continue; @@ -894,13 +896,17 @@ impl< Ok(true) => { // Mark verification complete self.state.verified(view); - }, + } Ok(false) => { // Verification failed for current view proposal, treat as immediate timeout debug!(round = ?context.round, "proposal failed verification"); - self.handle_timeout(&mut batcher, &mut vote_sender, &mut certificate_sender) - .await; - }, + self.handle_timeout( + &mut batcher, + &mut vote_sender, + &mut certificate_sender, + ) + .await; + } Err(err) => { debug!(?err, round = ?context.round, "failed to verify proposal"); } @@ -976,7 +982,8 @@ impl< } Certificate::Nullification(nullification) => { trace!(%view, from_resolver, "received nullification"); - if let Some(floor) = self.handle_nullification(nullification).await { + if let Some(floor) = self.handle_nullification(nullification).await + { warn!(?floor, "broadcasting nullification floor"); self.broadcast_certificate(&mut certificate_sender, floor) .await; diff --git a/consensus/src/simplex/actors/voter/mod.rs b/consensus/src/simplex/actors/voter/mod.rs index 42fff4b84b..366e7440d7 100644 --- a/consensus/src/simplex/actors/voter/mod.rs +++ b/consensus/src/simplex/actors/voter/mod.rs @@ -2122,10 +2122,16 @@ mod tests { let batcher_mailbox = batcher::Mailbox::new(batcher_sender); // Register network channels for the validator - let (vote_sender, _vote_receiver) = - oracle.control(me.clone()).register(0, TEST_QUOTA).await.unwrap(); - let (certificate_sender, _certificate_receiver) = - oracle.control(me.clone()).register(1, TEST_QUOTA).await.unwrap(); + let (vote_sender, _vote_receiver) = oracle + .control(me.clone()) + .register(0, TEST_QUOTA) + .await + .unwrap(); + let (certificate_sender, _certificate_receiver) = oracle + .control(me.clone()) + .register(1, TEST_QUOTA) + .await + .unwrap(); // Start the actor voter.start( @@ -2153,8 +2159,7 @@ mod tests { let (target_view, leader) = loop { // Send finalization to advance to next view - let (_, finalization) = - build_finalization(&schemes, &prev_proposal, quorum); + let (_, finalization) = build_finalization(&schemes, &prev_proposal, quorum); mailbox .resolved(Certificate::Finalization(finalization)) .await; @@ -2210,26 +2215,27 @@ mod tests { .as_slice(), ); let contents = (proposal.round, parent_payload, 0u64).encode(); - relay - .broadcast(&leader, (proposal.payload, contents)) - .await; + relay.broadcast(&leader, (proposal.payload, contents)).await; mailbox.proposal(proposal).await; // Wait for nullify vote for target_view. Since timeouts are 10s, receiving it // within 1s proves it came from verification failure, not timeout. loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Nullify(nullify)) if nullify.view() == target_view => { - break; - } - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Nullify(nullify)) + if nullify.view() == target_view => + { + break; } + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(1)) => { - panic!("expected nullify for view {} within 1s (timeouts are 10s)", target_view); + panic!( + "expected nullify for view {} within 1s (timeouts are 10s)", + target_view + ); }, } } @@ -2695,9 +2701,7 @@ mod tests { msg = resolver_receiver.next() => { matches!(msg, Some(MailboxMessage::Certified { .. })) }, - _ = context.sleep(Duration::from_secs(4)) => { - false - }, + _ = context.sleep(Duration::from_secs(4)) => false, }; assert!( @@ -2794,12 +2798,12 @@ mod tests { // Wait for timeout (nullify vote) WITHOUT sending notarize first loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == target_view => break, - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} - } + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Nullify(n)) + if n.view() == target_view => + break, + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(15)) => { panic!("expected nullify vote"); @@ -2822,7 +2826,10 @@ mod tests { let advanced = loop { select! { msg = batcher_receiver.next() => { - if let batcher::Message::Update { current, active, .. } = msg.unwrap() { + if let batcher::Message::Update { + current, active, .. + } = msg.unwrap() + { active.send(true).unwrap(); if current > target_view { break true; @@ -2834,7 +2841,10 @@ mod tests { }, } }; - assert!(advanced, "view should advance after certification (timeout case)"); + assert!( + advanced, + "view should advance after certification (timeout case)" + ); }); } @@ -2922,20 +2932,18 @@ mod tests { ); let leader = participants[1].clone(); let contents = (proposal.round, parent_payload, 0u64).encode(); - relay - .broadcast(&leader, (proposal.payload, contents)) - .await; + relay.broadcast(&leader, (proposal.payload, contents)).await; mailbox.proposal(proposal.clone()).await; // Wait for notarize vote loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Notarize(n)) if n.view() == target_view => break, - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} - } + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Notarize(n)) + if n.view() == target_view => + break, + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(5)) => { panic!("expected notarize vote"); @@ -2949,12 +2957,12 @@ mod tests { // Wait for nullify vote loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == target_view => break, - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} - } + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Nullify(n)) + if n.view() == target_view => + break, + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(1)) => { panic!("expected nullify vote"); @@ -2972,7 +2980,10 @@ mod tests { let advanced = loop { select! { msg = batcher_receiver.next() => { - if let batcher::Message::Update { current, active, .. } = msg.unwrap() { + if let batcher::Message::Update { + current, active, .. + } = msg.unwrap() + { active.send(true).unwrap(); if current > target_view { break true; @@ -2984,7 +2995,10 @@ mod tests { }, } }; - assert!(advanced, "view should advance after certification (follower case)"); + assert!( + advanced, + "view should advance after certification (follower case)" + ); }); } @@ -3075,14 +3089,14 @@ mod tests { // As leader, wait for our own notarize vote (automaton will propose) let proposal = loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Notarize(n)) if n.view() == target_view => { - break n.proposal.clone(); - } - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Notarize(n)) + if n.view() == target_view => + { + break n.proposal.clone(); } + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(5)) => { panic!("expected notarize vote as leader"); @@ -3096,12 +3110,12 @@ mod tests { // Wait for nullify vote loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == target_view => break, - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} - } + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Nullify(n)) + if n.view() == target_view => + break, + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(1)) => { panic!("expected nullify vote"); @@ -3119,7 +3133,10 @@ mod tests { let advanced = loop { select! { msg = batcher_receiver.next() => { - if let batcher::Message::Update { current, active, .. } = msg.unwrap() { + if let batcher::Message::Update { + current, active, .. + } = msg.unwrap() + { active.send(true).unwrap(); if current > target_view { break true; @@ -3131,7 +3148,10 @@ mod tests { }, } }; - assert!(advanced, "view should advance after certification (leader case)"); + assert!( + advanced, + "view should advance after certification (leader case)" + ); }); } @@ -3379,12 +3399,11 @@ mod tests { // Wait for the first nullify vote (confirms stuck state) loop { select! { - msg = batcher_receiver.next() => { - match msg.unwrap() { - batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == view_3 => break, - batcher::Message::Update { active, .. } => active.send(true).unwrap(), - _ => {} - } + msg = batcher_receiver.next() => match msg.unwrap() { + batcher::Message::Constructed(Vote::Nullify(n)) if n.view() == view_3 => + break, + batcher::Message::Update { active, .. } => active.send(true).unwrap(), + _ => {} }, _ = context.sleep(Duration::from_secs(10)) => { panic!("expected nullify vote for view 3"); @@ -3420,7 +3439,9 @@ mod tests { select! { msg = batcher_receiver.next() => { match msg.unwrap() { - batcher::Message::Update { current, active, .. } => { + batcher::Message::Update { + current, active, .. + } => { active.send(true).unwrap(); if current > view_3 { break true; @@ -3428,7 +3449,11 @@ mod tests { } batcher::Message::Constructed(Vote::Nullify(n)) => { // Still voting nullify for view 3 - expected - assert_eq!(n.view(), view_3, "should only vote nullify for stuck view"); + assert_eq!( + n.view(), + view_3, + "should only vote nullify for stuck view" + ); } _ => {} } @@ -3462,7 +3487,10 @@ mod tests { let rescued = loop { select! { msg = batcher_receiver.next() => { - if let batcher::Message::Update { current, active, .. } = msg.unwrap() { + if let batcher::Message::Update { + current, active, .. + } = msg.unwrap() + { active.send(true).unwrap(); if current > view_4 { break true; diff --git a/consensus/src/simplex/mocks/application.rs b/consensus/src/simplex/mocks/application.rs index 0cd1afa78e..8b1533229c 100644 --- a/consensus/src/simplex/mocks/application.rs +++ b/consensus/src/simplex/mocks/application.rs @@ -349,7 +349,11 @@ impl Application let digest = self.propose(context).await; response.send_lossy(digest); } - Message::Verify { context, payload, response } => { + Message::Verify { + context, + payload, + response, + } => { if let Some(contents) = seen.get(&payload) { let verified = self.verify(context, payload, contents.clone()).await; response.send_lossy(verified); @@ -360,7 +364,11 @@ impl Application .push((context, response)); } } - Message::Certify { round: _, payload, response } => { + Message::Certify { + round: _, + payload, + response, + } => { let contents = seen.get(&payload).cloned().unwrap_or_default(); // If certify returns None (Cancel mode), drop the sender without // responding, causing the receiver to return Err(Canceled). @@ -385,7 +393,7 @@ impl Application sender.send_lossy(verified); } } - } + }, } } } diff --git a/consensus/src/simplex/mod.rs b/consensus/src/simplex/mod.rs index fdce546c15..d9a491d5a5 100644 --- a/consensus/src/simplex/mod.rs +++ b/consensus/src/simplex/mod.rs @@ -1136,7 +1136,7 @@ mod tests { } } true - } + }, }; // Ensure no blocked connections @@ -2011,7 +2011,7 @@ mod tests { _timeout = context.sleep(Duration::from_secs(60)) => {}, _done = monitor.next() => { panic!("engine should not notarize or finalize anything"); - } + }, } }), ); @@ -2249,7 +2249,7 @@ mod tests { _timeout = context.sleep(Duration::from_secs(60)) => {}, _done = monitor.next() => { panic!("engine should not notarize or finalize anything"); - } + }, } }), ); diff --git a/examples/chat/src/handler.rs b/examples/chat/src/handler.rs index 189b7e2304..771e79f237 100644 --- a/examples/chat/src/handler.rs +++ b/examples/chat/src/handler.rs @@ -222,38 +222,60 @@ pub async fn run( Focus::Messages => Focus::Input, }; } - KeyCode::Up => { - match focused_window { - Focus::Logs => logs_scroll_vertical = logs_scroll_vertical.saturating_sub(1), - Focus::Metrics => metrics_scroll_vertical = metrics_scroll_vertical.saturating_sub(1), - Focus::Messages => messages_scroll_vertical = messages_scroll_vertical.saturating_sub(1), - _ => {} + KeyCode::Up => match focused_window { + Focus::Logs => { + logs_scroll_vertical = logs_scroll_vertical.saturating_sub(1) } - } - KeyCode::Down => { - match focused_window { - Focus::Logs => logs_scroll_vertical = logs_scroll_vertical.saturating_add(1), - Focus::Metrics => metrics_scroll_vertical = metrics_scroll_vertical.saturating_add(1), - Focus::Messages => messages_scroll_vertical = messages_scroll_vertical.saturating_add(1), - _ => {} + Focus::Metrics => { + metrics_scroll_vertical = metrics_scroll_vertical.saturating_sub(1) } - } - KeyCode::Left => { - match focused_window { - Focus::Logs => logs_scroll_horizontal = logs_scroll_horizontal.saturating_sub(1), - Focus::Metrics => metrics_scroll_horizontal = metrics_scroll_horizontal.saturating_sub(1), - Focus::Messages => messages_scroll_horizontal = messages_scroll_horizontal.saturating_sub(1), - _ => {} + Focus::Messages => { + messages_scroll_vertical = + messages_scroll_vertical.saturating_sub(1) } - } - KeyCode::Right => { - match focused_window { - Focus::Logs => logs_scroll_horizontal = logs_scroll_horizontal.saturating_add(1), - Focus::Metrics => metrics_scroll_horizontal = metrics_scroll_horizontal.saturating_add(1), - Focus::Messages => messages_scroll_horizontal = messages_scroll_horizontal.saturating_add(1), - _ => {} + _ => {} + }, + KeyCode::Down => match focused_window { + Focus::Logs => { + logs_scroll_vertical = logs_scroll_vertical.saturating_add(1) } - } + Focus::Metrics => { + metrics_scroll_vertical = metrics_scroll_vertical.saturating_add(1) + } + Focus::Messages => { + messages_scroll_vertical = + messages_scroll_vertical.saturating_add(1) + } + _ => {} + }, + KeyCode::Left => match focused_window { + Focus::Logs => { + logs_scroll_horizontal = logs_scroll_horizontal.saturating_sub(1) + } + Focus::Metrics => { + metrics_scroll_horizontal = + metrics_scroll_horizontal.saturating_sub(1) + } + Focus::Messages => { + messages_scroll_horizontal = + messages_scroll_horizontal.saturating_sub(1) + } + _ => {} + }, + KeyCode::Right => match focused_window { + Focus::Logs => { + logs_scroll_horizontal = logs_scroll_horizontal.saturating_add(1) + } + Focus::Metrics => { + metrics_scroll_horizontal = + metrics_scroll_horizontal.saturating_add(1) + } + Focus::Messages => { + messages_scroll_horizontal = + messages_scroll_horizontal.saturating_add(1) + } + _ => {} + }, KeyCode::Enter => { if input.is_empty() { continue; @@ -274,12 +296,15 @@ pub async fn run( } else { warn!(input, "dropped message"); } - let msg = Line::styled(format!( - "[{}] {}: {}", - chrono::Local::now().format("%m/%d %H:%M:%S"), - formatted_me, - input, - ), Style::default().fg(Color::Yellow)); + let msg = Line::styled( + format!( + "[{}] {}: {}", + chrono::Local::now().format("%m/%d %H:%M:%S"), + formatted_me, + input, + ), + Style::default().fg(Color::Yellow), + ); messages.push(msg); input = String::new(); } @@ -296,24 +321,25 @@ pub async fn run( } } }, - result = receiver.recv() => { - match result { - Ok((peer, msg)) => { - let peer = hex(&peer); - messages.push(format!( + result = receiver.recv() => match result { + Ok((peer, msg)) => { + let peer = hex(&peer); + messages.push( + format!( "[{}] {}**{}: {}", chrono::Local::now().format("%m/%d %H:%M:%S"), &peer[..4], &peer[peer.len() - 4..], String::from_utf8_lossy(msg.as_ref()) - ).into()); - } - Err(err) => { - debug!(?err, "failed to receive message"); - continue; - } + ) + .into(), + ); } - } + Err(err) => { + debug!(?err, "failed to receive message"); + continue; + } + }, }; } } diff --git a/examples/estimator/src/main.rs b/examples/estimator/src/main.rs index a6fe9f290b..160c1c4441 100644 --- a/examples/estimator/src/main.rs +++ b/examples/estimator/src/main.rs @@ -476,7 +476,7 @@ fn spawn_peer_jobs( }, _ = &mut listener => { break; - } + }, } (region, completions, maybe_proposer) diff --git a/examples/reshare/src/dkg/actor.rs b/examples/reshare/src/dkg/actor.rs index ee5585af84..c38b4e69de 100644 --- a/examples/reshare/src/dkg/actor.rs +++ b/examples/reshare/src/dkg/actor.rs @@ -359,9 +359,14 @@ where ) .await; if let Some(ack) = response { - let payload = Message::::Ack(ack).encode(); + let payload = + Message::::Ack(ack).encode(); if let Err(e) = round_sender - .send(Recipients::One(sender_pk.clone()), payload, true) + .send( + Recipients::One(sender_pk.clone()), + payload, + true, + ) .await { warn!(?epoch, dealer = ?sender_pk, ?e, "failed to send ack"); @@ -399,7 +404,9 @@ where } } MailboxMessage::Finalized { block, response } => { - let bounds = epocher.containing(block.height).expect("block height covered by epoch strategy"); + let bounds = epocher + .containing(block.height) + .expect("block height covered by epoch strategy"); let block_epoch = bounds.epoch(); let phase = bounds.phase(); let relative_height = bounds.relative(); @@ -458,39 +465,41 @@ where // Finalize the round before acknowledging let logs = storage.logs(epoch); - let (success, next_round, next_output, next_share) = - if let Some(ps) = player_state.take() { - match ps.finalize::(logs, &Sequential) { - Ok((new_output, new_share)) => ( - true, - epoch_state.round + 1, - Some(new_output), - Some(new_share), - ), - Err(_) => ( - false, - epoch_state.round, - epoch_state.output.clone(), - epoch_state.share.clone(), - ), - } - } else { - match observe::<_, _, N3f1>(round.clone(), logs, &Sequential) { - Ok(output) => (true, epoch_state.round + 1, Some(output), None), - Err(_) => ( - false, - epoch_state.round, - epoch_state.output.clone(), - epoch_state.share.clone(), - ), - } - }; + let (success, next_round, next_output, next_share) = if let Some(ps) = + player_state.take() + { + match ps.finalize::(logs, &Sequential) { + Ok((new_output, new_share)) => ( + true, + epoch_state.round + 1, + Some(new_output), + Some(new_share), + ), + Err(_) => ( + false, + epoch_state.round, + epoch_state.output.clone(), + epoch_state.share.clone(), + ), + } + } else { + match observe::<_, _, N3f1>(round.clone(), logs, &Sequential) { + Ok(output) => (true, epoch_state.round + 1, Some(output), None), + Err(_) => ( + false, + epoch_state.round, + epoch_state.output.clone(), + epoch_state.share.clone(), + ), + } + }; if success { info!(?epoch, "epoch succeeded"); self.successful_epochs.inc(); // Record reveals - let output = next_output.as_ref().expect("output exists on success"); + let output = + next_output.as_ref().expect("output exists on success"); let revealed = output.revealed(); self.all_reveals.inc_by(revealed.len() as u64); if revealed.position(&self_pk).is_some() { @@ -527,9 +536,7 @@ where }; // Exit the engine for this epoch now that the boundary is finalized - orchestrator - .exit(epoch) - .await; + orchestrator.exit(epoch).await; // If the update is stop, wait forever. if let PostUpdate::Stop = callback.on_update(update).await { diff --git a/examples/reshare/src/orchestrator/actor.rs b/examples/reshare/src/orchestrator/actor.rs index 1299a759be..cccc96d561 100644 --- a/examples/reshare/src/orchestrator/actor.rs +++ b/examples/reshare/src/orchestrator/actor.rs @@ -233,7 +233,9 @@ where %boundary_height, "received backup message from future epoch, ensuring boundary finalization" ); - self.marshal.hint_finalized(boundary_height, NonEmptyVec::new(from)).await; + self.marshal + .hint_finalized(boundary_height, NonEmptyVec::new(from)) + .await; }, transition = self.mailbox.next() => { let Some(transition) = transition else { diff --git a/examples/reshare/src/validator.rs b/examples/reshare/src/validator.rs index 33284c3f93..a995711e3a 100644 --- a/examples/reshare/src/validator.rs +++ b/examples/reshare/src/validator.rs @@ -600,11 +600,18 @@ mod test { failures += 1; (epoch, None) } - Update::Success { epoch, output, share } => { + Update::Success { + epoch, + output, + share, + } => { info!(epoch = ?epoch, pk = ?update.pk, ?output, "DKG success"); // Check if a delayed participant got an acknowledged share - if delayed.contains(&update.pk) && share.is_some() && output.revealed().position(&update.pk).is_none() { + if delayed.contains(&update.pk) + && share.is_some() + && output.revealed().position(&update.pk).is_none() + { info!(pk = ?update.pk, "delayed participant acknowledged"); delayed_acknowledged.insert(update.pk.clone()); } @@ -643,12 +650,9 @@ mod test { } else { PostUpdate::Continue }; - if update - .callback - .send(post_update) - .is_err() { - error!("update callback closed unexpectedly"); - continue; + if update.callback.send(post_update).is_err() { + error!("update callback closed unexpectedly"); + continue; } // Check if all active participants have reported @@ -720,21 +724,40 @@ mod test { info!(pk = ?pk, "restarting participant"); if team.output.is_none() { - team.start_one::(&ctx, &mut oracle, updates_in.clone(), pk).await; + team.start_one::( + &ctx, + &mut oracle, + updates_in.clone(), + pk, + ) + .await; } else { - team.start_one::, Random>(&ctx, &mut oracle, updates_in.clone(), pk).await; + team.start_one::, Random>( + &ctx, + &mut oracle, + updates_in.clone(), + pk, + ) + .await; } }, _ = crash_receiver.next() => { // Crash ticker fired (only for Random crashes) - let Some(Crash::Random { count, downtime, .. }) = &self.crash else { + let Some(Crash::Random { + count, downtime, .. + }) = &self.crash + else { continue; }; // Pick multiple random participants to crash - let all_participants: Vec = team.participants.keys().cloned().collect(); + let all_participants: Vec = + team.participants.keys().cloned().collect(); let crash_count = (*count).min(all_participants.len()); - let to_crash: Vec = all_participants.choose_multiple(&mut ctx, crash_count).cloned().collect(); + let to_crash: Vec = all_participants + .choose_multiple(&mut ctx, crash_count) + .cloned() + .collect(); for pk in to_crash { // Try to abort the handle if it exists let Some(handle) = team.handles.remove(&pk) else { diff --git a/examples/sync/src/bin/server.rs b/examples/sync/src/bin/server.rs index b3d13e142c..f737dd2748 100644 --- a/examples/sync/src/bin/server.rs +++ b/examples/sync/src/bin/server.rs @@ -356,7 +356,7 @@ where // Channel closed return Ok(()); } - } + }, } Ok(()) @@ -455,7 +455,7 @@ where error!(?err, "❌ failed to accept client"); } } - } + }, } Ok(()) diff --git a/examples/sync/src/net/io.rs b/examples/sync/src/net/io.rs index fc398d43c4..25f4028633 100644 --- a/examples/sync/src/net/io.rs +++ b/examples/sync/src/net/io.rs @@ -40,21 +40,22 @@ async fn run_loop( on_stopped => { debug!("context shutdown, terminating I/O task"); }, - outgoing = request_rx.next() => { - match outgoing { - Some(Request { request, response_tx }) => { - let request_id = request.request_id(); - pending_requests.insert(request_id, response_tx); - let data = request.encode(); - if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await { - if let Some(sender) = pending_requests.remove(&request_id) { - let _ = sender.send(Err(Error::Network(e))); - } - return; + outgoing = request_rx.next() => match outgoing { + Some(Request { + request, + response_tx, + }) => { + let request_id = request.request_id(); + pending_requests.insert(request_id, response_tx); + let data = request.encode(); + if let Err(e) = send_frame(&mut sink, data, MAX_MESSAGE_SIZE).await { + if let Some(sender) = pending_requests.remove(&request_id) { + let _ = sender.send(Err(Error::Network(e))); } - }, - None => return, + return; + } } + None => return, }, incoming = recv_frame(&mut stream, MAX_MESSAGE_SIZE) => { match incoming { @@ -65,10 +66,10 @@ async fn run_loop( if let Some(sender) = pending_requests.remove(&request_id) { let _ = sender.send(Ok(message)); } - }, + } Err(_) => { /* ignore */ } } - }, + } Err(_e) => { for (_, sender) in pending_requests.drain() { let _ = sender.send(Err(Error::RequestChannelClosed)); @@ -76,7 +77,7 @@ async fn run_loop( return; } } - } + }, } } diff --git a/macros/tests/select.rs b/macros/tests/select.rs index 4ca9695838..2d51470ad5 100644 --- a/macros/tests/select.rs +++ b/macros/tests/select.rs @@ -74,11 +74,9 @@ mod tests { select_loop! { mock_context, on_stopped => {}, - msg = rx.next() => { - match msg { - Some(v) => received.push(v), - None => break, - } + msg = rx.next() => match msg { + Some(v) => received.push(v), + None => break, }, } assert_eq!(received, vec![1, 2, 3]); @@ -124,12 +122,10 @@ mod tests { select_loop! { mock_context, on_stopped => {}, - msg = rx.next() => { - match msg { - Some(v) if v % 2 != 0 => continue, - Some(v) => evens.push(v), - None => break, - } + msg = rx.next() => match msg { + Some(v) if v % 2 != 0 => continue, + Some(v) => evens.push(v), + None => break, }, } assert_eq!(evens, vec![2, 4]); @@ -197,11 +193,9 @@ mod tests { start_count += 1; }, on_stopped => {}, - msg = rx.next() => { - match msg { - Some(v) => received.push(v), - None => break, - } + msg = rx.next() => match msg { + Some(v) => received.push(v), + None => break, }, on_end => { end_count += 1; @@ -240,11 +234,9 @@ mod tests { } }, on_stopped => {}, - msg = rx.next() => { - match msg { - Some(v) => received.push(v), - None => break, - } + msg = rx.next() => match msg { + Some(v) => received.push(v), + None => break, }, on_end => { end_count += 1; @@ -360,8 +352,10 @@ mod tests { drop(tx3); select! { - msg = rx3.next() => if let Some(v) = msg { - results.push(v); + msg = rx3.next() => { + if let Some(v) = msg { + results.push(v); + } }, } diff --git a/p2p/src/authenticated/discovery/actors/dialer.rs b/p2p/src/authenticated/discovery/actors/dialer.rs index e6145239a8..a8d1a69060 100644 --- a/p2p/src/authenticated/discovery/actors/dialer.rs +++ b/p2p/src/authenticated/discovery/actors/dialer.rs @@ -168,10 +168,7 @@ impl { // Update the deadline. - dial_deadline = dial_deadline.add_jittered( - &mut self.context, - self.dial_frequency, - ); + dial_deadline = dial_deadline.add_jittered(&mut self.context, self.dial_frequency); // Pop the queue until we can reserve a peer. // If a peer is reserved, attempt to dial it. @@ -186,10 +183,8 @@ impl { // Update the deadline. - query_deadline = query_deadline.add_jittered( - &mut self.context, - self.query_frequency, - ); + query_deadline = + query_deadline.add_jittered(&mut self.context, self.query_frequency); // Only update the queue if it is empty. if self.queue.is_empty() { @@ -275,21 +270,22 @@ mod tests { let deadline = context.current() + dial_frequency * 3; loop { select! { - msg = tracker_rx.next() => { - match msg { - Some(tracker::Message::Dialable { responder }) => { - let _ = responder.send(peers.clone()); - } - Some(tracker::Message::Dial { public_key, reservation }) => { - dial_count += 1; - let ingress: Ingress = - SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into(); - let metadata = Metadata::Dialer(public_key, ingress); - let res = tracker::Reservation::new(metadata, releaser.clone()); - let _ = reservation.send(Some(res)); - } - _ => {} + msg = tracker_rx.next() => match msg { + Some(tracker::Message::Dialable { responder }) => { + let _ = responder.send(peers.clone()); } + Some(tracker::Message::Dial { + public_key, + reservation, + }) => { + dial_count += 1; + let ingress: Ingress = + SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into(); + let metadata = Metadata::Dialer(public_key, ingress); + let res = tracker::Reservation::new(metadata, releaser.clone()); + let _ = reservation.send(Some(res)); + } + _ => {} }, _ = context.sleep_until(deadline) => break, } diff --git a/p2p/src/authenticated/discovery/actors/listener.rs b/p2p/src/authenticated/discovery/actors/listener.rs index 42057d25ef..da42e22886 100644 --- a/p2p/src/authenticated/discovery/actors/listener.rs +++ b/p2p/src/authenticated/discovery/actors/listener.rs @@ -246,7 +246,7 @@ impl Actor Actor { .await?; // Send/Receive messages from the peer - let mut send_handler: Handle> = self.context.with_label("sender").spawn( { - let peer = peer.clone(); - let mut tracker = tracker.clone(); - let mailbox = self.mailbox.clone(); - let rate_limits = rate_limits.clone(); - move |context| async move { - // Set the initial deadline to now to start gossiping immediately - let mut deadline = context.current(); - - // Enter into the main loop - select_loop! { - context, - on_stopped => {}, - _ = context.sleep_until(deadline) => { - // Get latest bitset from tracker (also used as ping) - tracker.construct(peer.clone(), mailbox.clone()); - - // Reset ticker - deadline = context.current() + self.gossip_bit_vec_frequency; - }, - msg_control = self.control.next() => { - let msg = match msg_control { - Some(msg_control) => msg_control, - None => return Err(Error::PeerDisconnected), - }; - let (metric, payload) = match msg { - Message::BitVec(bit_vec) => - (metrics::Message::new_bit_vec(&peer), types::Payload::BitVec(bit_vec)), - Message::Peers(peers) => - (metrics::Message::new_peers(&peer), types::Payload::Peers(peers)), - Message::Kill => { - return Err(Error::PeerKilled(peer.to_string())) - } - }; - Self::send_payload(&mut conn_sender, &self.sent_messages, metric, payload) + let mut send_handler: Handle> = + self.context.with_label("sender").spawn({ + let peer = peer.clone(); + let mut tracker = tracker.clone(); + let mailbox = self.mailbox.clone(); + let rate_limits = rate_limits.clone(); + move |context| async move { + // Set the initial deadline to now to start gossiping immediately + let mut deadline = context.current(); + + // Enter into the main loop + select_loop! { + context, + on_stopped => {}, + _ = context.sleep_until(deadline) => { + // Get latest bitset from tracker (also used as ping) + tracker.construct(peer.clone(), mailbox.clone()); + + // Reset ticker + deadline = context.current() + self.gossip_bit_vec_frequency; + }, + msg_control = self.control.next() => { + let msg = match msg_control { + Some(msg_control) => msg_control, + None => return Err(Error::PeerDisconnected), + }; + let (metric, payload) = match msg { + Message::BitVec(bit_vec) => ( + metrics::Message::new_bit_vec(&peer), + types::Payload::BitVec(bit_vec), + ), + Message::Peers(peers) => ( + metrics::Message::new_peers(&peer), + types::Payload::Peers(peers), + ), + Message::Kill => return Err(Error::PeerKilled(peer.to_string())), + }; + Self::send_payload( + &mut conn_sender, + &self.sent_messages, + metric, + payload, + ) .await?; - }, - msg_high = self.high.next() => { - // Data is already pre-encoded, just forward to stream - let encoded = Self::validate_outbound_msg(msg_high, &rate_limits)?; - Self::send_encoded(&mut conn_sender, &self.sent_messages, metrics::Message::new_data(&peer, encoded.channel), encoded.payload) + }, + msg_high = self.high.next() => { + // Data is already pre-encoded, just forward to stream + let encoded = Self::validate_outbound_msg(msg_high, &rate_limits)?; + Self::send_encoded( + &mut conn_sender, + &self.sent_messages, + metrics::Message::new_data(&peer, encoded.channel), + encoded.payload, + ) .await?; - }, - msg_low = self.low.next() => { - // Data is already pre-encoded, just forward to stream - let encoded = Self::validate_outbound_msg(msg_low, &rate_limits)?; - Self::send_encoded(&mut conn_sender, &self.sent_messages, metrics::Message::new_data(&peer, encoded.channel), encoded.payload) + }, + msg_low = self.low.next() => { + // Data is already pre-encoded, just forward to stream + let encoded = Self::validate_outbound_msg(msg_low, &rate_limits)?; + Self::send_encoded( + &mut conn_sender, + &self.sent_messages, + metrics::Message::new_data(&peer, encoded.channel), + encoded.payload, + ) .await?; + }, } - } - Ok(()) - } - }); + Ok(()) + } + }); let mut receive_handler: Handle> = self .context .with_label("receiver") @@ -349,12 +367,8 @@ impl Actor { debug!("context shutdown, stopping peer"); Ok(Ok(())) }, - send_result = &mut send_handler => { - send_result - }, - receive_result = &mut receive_handler => { - receive_result - } + send_result = &mut send_handler => send_result, + receive_result = &mut receive_handler => receive_result, }; // Parse result diff --git a/p2p/src/authenticated/discovery/actors/router/actor.rs b/p2p/src/authenticated/discovery/actors/router/actor.rs index 398193df21..f36e83604d 100644 --- a/p2p/src/authenticated/discovery/actors/router/actor.rs +++ b/p2p/src/authenticated/discovery/actors/router/actor.rs @@ -163,7 +163,7 @@ impl Actor { let _ = response.send(receiver); } } - } + }, } } diff --git a/p2p/src/authenticated/discovery/actors/spawner/actor.rs b/p2p/src/authenticated/discovery/actors/spawner/actor.rs index b03faf6179..a2fcec9eb1 100644 --- a/p2p/src/authenticated/discovery/actors/spawner/actor.rs +++ b/p2p/src/authenticated/discovery/actors/spawner/actor.rs @@ -162,7 +162,8 @@ impl Actor { break; }; self.handle_msg(msg).await; - } + }, } } diff --git a/p2p/src/authenticated/discovery/mod.rs b/p2p/src/authenticated/discovery/mod.rs index b7f1dd4361..419f21397d 100644 --- a/p2p/src/authenticated/discovery/mod.rs +++ b/p2p/src/authenticated/discovery/mod.rs @@ -910,7 +910,7 @@ mod tests { // Channel closed due to shutdown break; } - } + }, } } }); @@ -1206,8 +1206,12 @@ mod tests { }); select! { - receiver = receiver => { panic!("receiver exited: {receiver:?}") }, - sender = sender => { panic!("sender exited: {sender:?}") }, + receiver = receiver => { + panic!("receiver exited: {receiver:?}") + }, + sender = sender => { + panic!("sender exited: {sender:?}") + }, } } }); @@ -1433,8 +1437,12 @@ mod tests { }); select! { - receiver = receiver => { panic!("receiver exited: {receiver:?}") }, - sender = sender => { panic!("sender exited: {sender:?}") }, + receiver = receiver => { + panic!("receiver exited: {receiver:?}") + }, + sender = sender => { + panic!("sender exited: {sender:?}") + }, } } }); @@ -1523,7 +1531,7 @@ mod tests { }, _ = context.sleep(Duration::from_secs(1)) => { // Expected: timeout with no message - } + }, } }); } diff --git a/p2p/src/authenticated/lookup/actors/dialer.rs b/p2p/src/authenticated/lookup/actors/dialer.rs index 1e42922713..80ea639cf4 100644 --- a/p2p/src/authenticated/lookup/actors/dialer.rs +++ b/p2p/src/authenticated/lookup/actors/dialer.rs @@ -172,10 +172,7 @@ impl { // Update the deadline. - dial_deadline = dial_deadline.add_jittered( - &mut self.context, - self.dial_frequency, - ); + dial_deadline = dial_deadline.add_jittered(&mut self.context, self.dial_frequency); // Pop the queue until we can reserve a peer. // If a peer is reserved, attempt to dial it. @@ -190,10 +187,8 @@ impl { // Update the deadline. - query_deadline = query_deadline.add_jittered( - &mut self.context, - self.query_frequency, - ); + query_deadline = + query_deadline.add_jittered(&mut self.context, self.query_frequency); // Only update the queue if it is empty. if self.queue.is_empty() { @@ -202,7 +197,7 @@ impl { - match msg { - Some(tracker::Message::Dialable { responder }) => { - let _ = responder.send(peers.clone()); - } - Some(tracker::Message::Dial { public_key, reservation }) => { - dial_count += 1; - let metadata = Metadata::Dialer(public_key); - let res = tracker::Reservation::new(metadata, releaser.clone()); - let ingress: Ingress = - SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into(); - let _ = reservation.send(Some((res, ingress))); - } - _ => {} + msg = tracker_rx.next() => match msg { + Some(tracker::Message::Dialable { responder }) => { + let _ = responder.send(peers.clone()); + } + Some(tracker::Message::Dial { + public_key, + reservation, + }) => { + dial_count += 1; + let metadata = Metadata::Dialer(public_key); + let res = tracker::Reservation::new(metadata, releaser.clone()); + let ingress: Ingress = + SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into(); + let _ = reservation.send(Some((res, ingress))); } + _ => {} }, _ = context.sleep_until(deadline) => break, } diff --git a/p2p/src/authenticated/lookup/actors/listener.rs b/p2p/src/authenticated/lookup/actors/listener.rs index aaa0c8eee8..006d3578b1 100644 --- a/p2p/src/authenticated/lookup/actors/listener.rs +++ b/p2p/src/authenticated/lookup/actors/listener.rs @@ -262,8 +262,13 @@ impl Actor Actor { let ping_rate_limiter = RateLimiter::direct_with_clock(ping_rate, self.context.clone()); // Send/Receive messages from the peer - let mut send_handler: Handle> = self.context.with_label("sender").spawn( { - let peer = peer.clone(); - let rate_limits = rate_limits.clone(); - move |context| async move { - // Set the initial deadline (no need to send right away) - let mut deadline = context.current() + self.ping_frequency; + let mut send_handler: Handle> = + self.context.with_label("sender").spawn({ + let peer = peer.clone(); + let rate_limits = rate_limits.clone(); + move |context| async move { + // Set the initial deadline (no need to send right away) + let mut deadline = context.current() + self.ping_frequency; - // Enter into the main loop - select_loop! { - context, - on_stopped => {}, - _ = context.sleep_until(deadline) => { - // Periodically send a ping to the peer - Self::send_payload( - &mut conn_sender, - &self.sent_messages, - metrics::Message::new_ping(&peer), - types::Message::Ping, - ).await?; + // Enter into the main loop + select_loop! { + context, + on_stopped => {}, + _ = context.sleep_until(deadline) => { + // Periodically send a ping to the peer + Self::send_payload( + &mut conn_sender, + &self.sent_messages, + metrics::Message::new_ping(&peer), + types::Message::Ping, + ) + .await?; - // Reset ticker - deadline = context.current() + self.ping_frequency; - }, - msg_control = self.control.next() => { - let msg = match msg_control { - Some(msg_control) => msg_control, - None => return Err(Error::PeerDisconnected), - }; - match msg { - Message::Kill => { - return Err(Error::PeerKilled(peer.to_string())) + // Reset ticker + deadline = context.current() + self.ping_frequency; + }, + msg_control = self.control.next() => { + let msg = match msg_control { + Some(msg_control) => msg_control, + None => return Err(Error::PeerDisconnected), + }; + match msg { + Message::Kill => return Err(Error::PeerKilled(peer.to_string())), } - } - }, - msg_high = self.high.next() => { - // Data is already pre-encoded, just forward to stream - let encoded = Self::validate_outbound_msg(msg_high, &rate_limits)?; - Self::send_encoded(&mut conn_sender, &self.sent_messages, metrics::Message::new_data(&peer, encoded.channel), encoded.payload) + }, + msg_high = self.high.next() => { + // Data is already pre-encoded, just forward to stream + let encoded = Self::validate_outbound_msg(msg_high, &rate_limits)?; + Self::send_encoded( + &mut conn_sender, + &self.sent_messages, + metrics::Message::new_data(&peer, encoded.channel), + encoded.payload, + ) .await?; - }, - msg_low = self.low.next() => { - // Data is already pre-encoded, just forward to stream - let encoded = Self::validate_outbound_msg(msg_low, &rate_limits)?; - Self::send_encoded(&mut conn_sender, &self.sent_messages, metrics::Message::new_data(&peer, encoded.channel), encoded.payload) + }, + msg_low = self.low.next() => { + // Data is already pre-encoded, just forward to stream + let encoded = Self::validate_outbound_msg(msg_low, &rate_limits)?; + Self::send_encoded( + &mut conn_sender, + &self.sent_messages, + metrics::Message::new_data(&peer, encoded.channel), + encoded.payload, + ) .await?; + }, } - } - Ok(()) - } - }); + Ok(()) + } + }); let mut receive_handler: Handle> = self .context .with_label("receiver") @@ -257,12 +267,8 @@ impl Actor { debug!("context shutdown, stopping peer"); Ok(Ok(())) }, - send_result = &mut send_handler => { - send_result - }, - receive_result = &mut receive_handler => { - receive_result - } + send_result = &mut send_handler => send_result, + receive_result = &mut receive_handler => receive_result, }; // Parse result diff --git a/p2p/src/authenticated/lookup/actors/router/actor.rs b/p2p/src/authenticated/lookup/actors/router/actor.rs index 062d73b64a..95c4e697e8 100644 --- a/p2p/src/authenticated/lookup/actors/router/actor.rs +++ b/p2p/src/authenticated/lookup/actors/router/actor.rs @@ -162,7 +162,7 @@ impl Actor { let _ = response.send(receiver); } } - } + }, } } diff --git a/p2p/src/authenticated/lookup/actors/spawner/actor.rs b/p2p/src/authenticated/lookup/actors/spawner/actor.rs index c743ada589..9ca2c186a1 100644 --- a/p2p/src/authenticated/lookup/actors/spawner/actor.rs +++ b/p2p/src/authenticated/lookup/actors/spawner/actor.rs @@ -138,7 +138,8 @@ impl Actor { }, _ = self.directory.wait_for_unblock() => { if self.directory.unblock_expired() { - self.listener.0.send_lossy(self.directory.listenable()).await; + self.listener + .0 + .send_lossy(self.directory.listenable()) + .await; } }, msg = self.receiver.next() => { @@ -123,7 +126,7 @@ impl Actor { break; }; self.handle_msg(msg).await; - } + }, } } diff --git a/p2p/src/authenticated/lookup/mod.rs b/p2p/src/authenticated/lookup/mod.rs index 94f24f338f..8dd6749245 100644 --- a/p2p/src/authenticated/lookup/mod.rs +++ b/p2p/src/authenticated/lookup/mod.rs @@ -812,7 +812,7 @@ mod tests { _ = context.stopped() => { // Graceful shutdown signal received break; - } + }, } } } @@ -1112,8 +1112,12 @@ mod tests { }); select! { - receiver = receiver => { panic!("receiver exited: {receiver:?}") }, - sender = sender_task => { panic!("sender exited: {sender:?}") }, + receiver = receiver => { + panic!("receiver exited: {receiver:?}") + }, + sender = sender_task => { + panic!("sender exited: {sender:?}") + }, } } }); @@ -1242,8 +1246,12 @@ mod tests { }); select! { - receiver = receiver => { panic!("receiver exited: {receiver:?}") }, - sender = sender_task => { panic!("sender exited: {sender:?}") }, + receiver = receiver => { + panic!("receiver exited: {receiver:?}") + }, + sender = sender_task => { + panic!("sender exited: {sender:?}") + }, } } }); @@ -1335,7 +1343,7 @@ mod tests { }, _ = context.sleep(Duration::from_secs(1)) => { // Expected: timeout with no message - } + }, } }); } diff --git a/p2p/src/simulated/network.rs b/p2p/src/simulated/network.rs index cb0d46ff59..20897ab5c5 100644 --- a/p2p/src/simulated/network.rs +++ b/p2p/src/simulated/network.rs @@ -852,7 +852,7 @@ impl Sender { Some(task) => task, None => break, }; - } + }, } // Send task @@ -1114,14 +1114,20 @@ impl Peer

{ // Listen for control messages, which are used to register channels control = control_receiver.next() => { // If control is closed, exit - let (channel, sender, result_tx): (Channel, Handle<()>, oneshot::Sender>) = match control { + let (channel, sender, result_tx): ( + Channel, + Handle<()>, + oneshot::Sender>, + ) = match control { Some(control) => control, None => break, }; // Register channel let (receiver_tx, receiver_rx) = mpsc::unbounded(); - if let Some((_, existing_sender)) = mailboxes.insert(channel, (receiver_tx, sender)) { + if let Some((_, existing_sender)) = + mailboxes.insert(channel, (receiver_tx, sender)) + { warn!(?public_key, ?channel, "overwriting existing channel"); existing_sender.abort(); } diff --git a/p2p/src/utils/mux.rs b/p2p/src/utils/mux.rs index f6f34220f9..7c69365e9e 100644 --- a/p2p/src/utils/mux.rs +++ b/p2p/src/utils/mux.rs @@ -129,11 +129,11 @@ impl Muxer { let (tx, rx) = mpsc::channel(self.mailbox_size); self.routes.insert(subchannel, tx); let _ = sender.send(rx); - }, + } Some(Control::Deregister { subchannel }) => { // Remove the route. self.routes.remove(&subchannel); - }, + } None => { // If the control channel is closed, we can shut down since there must // be no more registrations, and all receivers must have been dropped. @@ -180,7 +180,7 @@ impl Muxer { debug!(?subchannel, "subchannel full, dropping message"); } } - } + }, } Ok(()) diff --git a/resolver/src/p2p/engine.rs b/resolver/src/p2p/engine.rs index 640f8f80ed..d5137d0d81 100644 --- a/resolver/src/p2p/engine.rs +++ b/resolver/src/p2p/engine.rs @@ -245,7 +245,8 @@ impl< // Only start new fetch if not already in progress if is_new { - self.fetch_timers.insert(key.clone(), self.metrics.fetch_duration.timer()); + self.fetch_timers + .insert(key.clone(), self.metrics.fetch_duration.timer()); self.fetcher.add_ready(key); } else { trace!(?key, "updated targets for existing fetch"); @@ -269,7 +270,10 @@ impl< // Clean up timers and notify consumer let before = self.fetch_timers.len(); - let removed = self.fetch_timers.extract_if(|k, _| !predicate(k)).collect::>(); + let removed = self + .fetch_timers + .extract_if(|k, _| !predicate(k)) + .collect::>(); for (key, timer) in removed { timer.cancel(); self.consumer.failed(key, ()).await; @@ -308,7 +312,12 @@ impl< }, // Handle completed server requests serve = self.serves.next_completed() => { - let Serve { timer, peer, id, result } = serve; + let Serve { + timer, + peer, + id, + result, + } = serve; // Metrics and logs match result { @@ -323,7 +332,8 @@ impl< } // Send response to peer - self.handle_serve(&mut sender, peer, id, result, self.priority_responses).await; + self.handle_serve(&mut sender, peer, id, result, self.priority_responses) + .await; }, // Handle network messages msg = receiver.recv() => { @@ -345,8 +355,12 @@ impl< } }; match msg.payload { - wire::Payload::Request(key) => self.handle_network_request(peer, msg.id, key).await, - wire::Payload::Response(response) => self.handle_network_response(peer, msg.id, response).await, + wire::Payload::Request(key) => { + self.handle_network_request(peer, msg.id, key).await + } + wire::Payload::Response(response) => { + self.handle_network_response(peer, msg.id, response).await + } wire::Payload::Error => self.handle_network_error_response(peer, msg.id).await, }; }, diff --git a/resolver/src/p2p/mod.rs b/resolver/src/p2p/mod.rs index b88f8bcf33..ea67dc99cf 100644 --- a/resolver/src/p2p/mod.rs +++ b/resolver/src/p2p/mod.rs @@ -562,7 +562,9 @@ mod tests { // Cancel before sending the fetch request, expecting no effect mailbox1.cancel(key.clone()).await; select! { - _ = cons_out1.next() => { panic!("unexpected event"); }, + _ = cons_out1.next() => { + panic!("unexpected event"); + }, _ = context.sleep(Duration::from_millis(100)) => {}, }; @@ -580,7 +582,9 @@ mod tests { // Attempt to cancel after data has been delivered, expecting no effect mailbox1.cancel(key.clone()).await; select! { - _ = cons_out1.next() => { panic!("unexpected event"); }, + _ = cons_out1.next() => { + panic!("unexpected event"); + }, _ = context.sleep(Duration::from_millis(100)) => {}, }; @@ -1385,7 +1389,9 @@ mod tests { // Retain before fetching should have no effect mailbox1.retain(|_| true).await; select! { - _ = cons_out1.next() => { panic!("unexpected event"); }, + _ = cons_out1.next() => { + panic!("unexpected event"); + }, _ = context.sleep(Duration::from_millis(100)) => {}, }; @@ -1466,7 +1472,9 @@ mod tests { // Clear before fetching should have no effect mailbox1.clear().await; select! { - _ = cons_out1.next() => { panic!("unexpected event"); }, + _ = cons_out1.next() => { + panic!("unexpected event"); + }, _ = context.sleep(Duration::from_millis(100)) => {}, }; diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index be97f4c7a7..ae9081d26e 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -1074,9 +1074,7 @@ impl crate::Spawner for Context { result.map_err(|_| Error::Closed)?; Ok(()) }, - _ = timeout_future => { - Err(Error::Timeout) - } + _ = timeout_future => Err(Error::Timeout), } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 13b703c0b2..f3b1e9de6b 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -480,12 +480,8 @@ pub trait Clock: { async move { select! { - result = future => { - Ok(result) - }, - _ = self.sleep(duration) => { - Err(Error::Timeout) - }, + result = future => Ok(result), + _ = self.sleep(duration) => Err(Error::Timeout), } } } diff --git a/runtime/src/mocks.rs b/runtime/src/mocks.rs index fa8526b1a9..12642b5f6c 100644 --- a/runtime/src/mocks.rs +++ b/runtime/src/mocks.rs @@ -314,9 +314,7 @@ mod tests { v = stream.recv(5) => { panic!("unexpected value: {v:?}"); }, - _ = context.sleep(Duration::from_millis(100)) => { - "timeout" - }, + _ = context.sleep(Duration::from_millis(100)) => "timeout", }; }); } diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index 41fad15fb8..b2abb1cb5d 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -515,9 +515,7 @@ impl crate::Spawner for Context { result.map_err(|_| Error::Closed)?; Ok(()) }, - _ = timeout_future => { - Err(Error::Timeout) - } + _ = timeout_future => Err(Error::Timeout), } } diff --git a/storage/src/qmdb/sync/engine.rs b/storage/src/qmdb/sync/engine.rs index f7155595b7..37c680233d 100644 --- a/storage/src/qmdb/sync/engine.rs +++ b/storage/src/qmdb/sync/engine.rs @@ -65,9 +65,10 @@ async fn wait_for_event( ); select! { - target = target_update_fut => { - target.map_or_else(|| Some(Event::UpdateChannelClosed), |target| Some(Event::TargetUpdate(target))) - }, + target = target_update_fut => target.map_or_else( + || Some(Event::UpdateChannelClosed), + |target| Some(Event::TargetUpdate(target)) + ), result = outstanding_requests.futures_mut().next() => { result.map(|fetch_result| Event::BatchReceived(fetch_result)) }, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 035303e419..7d363d5382 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -219,8 +219,8 @@ pub async fn dial( }; select! { - x = inner_routine => { x } , - _ = timeout => { Err(Error::HandshakeTimeout) } + x = inner_routine => x, + _ = timeout => Err(Error::HandshakeTimeout), } } @@ -286,8 +286,8 @@ pub async fn listen< }; select! { - x = inner_routine => { x } , - _ = timeout => { Err(Error::HandshakeTimeout) } + x = inner_routine => x, + _ = timeout => Err(Error::HandshakeTimeout), } }