From 4dd98d8700aee67f8911192a92b7cf180abb8c0c Mon Sep 17 00:00:00 2001 From: Tim Ling <791016+kettlebell@users.noreply.github.com> Date: Fri, 28 Jul 2023 18:21:59 +1000 Subject: [PATCH 1/2] Remove `async_trait` requirements in diffusion.rs --- spectrum-crypto/src/lib.rs | 3 +- spectrum-network/src/lib.rs | 6 +- .../src/protocol_handler/diffusion.rs | 218 +++++++++--------- .../src/protocol_handler/multicasting.rs | 32 +-- spectrum-network/src/protocol_handler/pool.rs | 8 +- 5 files changed, 138 insertions(+), 129 deletions(-) diff --git a/spectrum-crypto/src/lib.rs b/spectrum-crypto/src/lib.rs index ad75d09b..5029a4f7 100644 --- a/spectrum-crypto/src/lib.rs +++ b/spectrum-crypto/src/lib.rs @@ -1,4 +1,4 @@ -use async_trait::async_trait; +#![feature(async_fn_in_trait)] pub mod digest; mod hash; @@ -15,7 +15,6 @@ pub trait VerifiableAgainst

{ pub struct Verified(pub S); /// Some statement which can be verified against public data `P`. -#[async_trait] pub trait AsyncVerifiable

: Send + Sync + Sized { type Err: Send; async fn verify(self, public_data: &P) -> Result, Self::Err>; diff --git a/spectrum-network/src/lib.rs b/spectrum-network/src/lib.rs index 73610c7d..2bced3a5 100644 --- a/spectrum-network/src/lib.rs +++ b/spectrum-network/src/lib.rs @@ -1,9 +1,11 @@ +#![feature(async_fn_in_trait)] + pub mod network_controller; +pub mod one_shot_upgrade; pub mod peer_conn_handler; pub mod peer_manager; pub mod protocol; +pub mod protocol_api; pub mod protocol_handler; pub mod protocol_upgrade; pub mod types; -pub mod protocol_api; -pub mod one_shot_upgrade; diff --git a/spectrum-network/src/protocol_handler/diffusion.rs b/spectrum-network/src/protocol_handler/diffusion.rs index df368e4d..d350acdd 100644 --- a/spectrum-network/src/protocol_handler/diffusion.rs +++ b/spectrum-network/src/protocol_handler/diffusion.rs @@ -67,13 +67,11 @@ enum DiffusionBehaviourIn { }, } -#[async_trait::async_trait] trait DiffusionStateWrite { async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState); async fn update_modifier(&self, modifier_id: ModifierId, status: ModifierStatus); } -#[async_trait::async_trait] impl DiffusionStateWrite for Sender> { async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState) { self.send(FromTask::ToBehaviour(DiffusionBehaviourIn::UpdatePeer { @@ -94,12 +92,10 @@ impl DiffusionStateWrite for Sender ModifierStatus; } -#[async_trait::async_trait] impl DiffusionStateRead for Sender> { async fn modifier_status(&self, modifier: ModifierId) -> ModifierStatus { let (snd, recv) = oneshot::channel(); @@ -179,74 +175,78 @@ where fn on_sync(&mut self, peer_id: PeerId, peer_status: SyncStatus, initial: bool) { let service = self.remote_sync.clone(); let conf = self.conf; - self.tasks.spawn(|to_behaviour| async move { - let peer_state = service.remote_state(peer_status).await; - to_behaviour.update_peer(peer_id, peer_state.clone()).await; - if initial { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( - NetworkAction::EnablePeer { - peer_id, - handshakes: service.make_poly_handshake().await, - }, - ))) - .await - .unwrap(); - } - match peer_state.cmp { - RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {} - RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => { - if !initial { - // sync is alerady included into handshake if initial + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + let peer_state = service.remote_state(peer_status).await; + to_behaviour.update_peer(peer_id, peer_state.clone()).await; + if initial { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( + NetworkAction::EnablePeer { + peer_id, + handshakes: service.make_poly_handshake().await, + }, + ))) + .await + .unwrap(); + } + match peer_state.cmp { + RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {} + RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => { + if !initial { + // sync is alerady included into handshake if initial + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::sync_status_v1(service.local_status().await), + })) + .await + .unwrap(); + } + } + RemoteChainCmp::Longer(Some(wanted_suffix)) => { to_behaviour .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { peer_id, - message: DiffusionMessage::sync_status_v1(service.local_status().await), + message: DiffusionMessage::request_modifiers_v1( + ModifierType::BlockHeader, + wanted_suffix.into_iter().map(ModifierId::from).collect(), + ), + })) + .await + .unwrap(); + } + RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => { + let ext = service.extension(remote_tip, conf.max_inv_size).await; + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::inv_v1( + ModifierType::BlockHeader, + ext.into_iter().map(ModifierId::from).collect(), + ), })) .await .unwrap(); } } - RemoteChainCmp::Longer(Some(wanted_suffix)) => { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::request_modifiers_v1( - ModifierType::BlockHeader, - wanted_suffix.into_iter().map(ModifierId::from).collect(), - ), - })) - .await - .unwrap(); - } - RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => { - let ext = service.extension(remote_tip, conf.max_inv_size).await; - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::inv_v1( - ModifierType::BlockHeader, - ext.into_iter().map(ModifierId::from).collect(), - ), - })) - .await - .unwrap(); - } - } + }) }) } fn on_modifiers_request(&mut self, peer_id: PeerId, mod_type: ModifierType, modifiers: Vec) { let service = self.remote_sync.clone(); - self.tasks.spawn(|to_behaviour| async move { - let raw_modifiers = service.get_modifiers(mod_type, modifiers).await; - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::Send { - peer_id, - message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers), - })) - .await - .unwrap(); + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + let raw_modifiers = service.get_modifiers(mod_type, modifiers).await; + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::Send { + peer_id, + message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers), + })) + .await + .unwrap(); + }) }) } @@ -257,31 +257,33 @@ where raw_modifiers: Vec, ) { let ledger_view = self.ledger_view.clone(); - self.tasks.spawn(|to_behaviour| async move { - let mut modifiers = vec![]; - for m in raw_modifiers { - if let Ok(md) = decode_modifier(mod_type, &m) { - to_behaviour - .update_modifier(md.id(), ModifierStatus::Received) - .await; - modifiers.push(md) - } else { - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(peer_id), - ))) - .await - .unwrap(); - break; + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + let mut modifiers = vec![]; + for m in raw_modifiers { + if let Ok(md) = decode_modifier(mod_type, &m) { + to_behaviour + .update_modifier(md.id(), ModifierStatus::Received) + .await; + modifiers.push(md) + } else { + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(peer_id), + ))) + .await + .unwrap(); + break; + } } - } - stream::iter(modifiers) - .then(|md| { - let mut ledger = ledger_view.clone(); - async move { ledger.apply_modifier(md).await } - }) - .collect::>() - .await; + stream::iter(modifiers) + .then(|md| { + let mut ledger = ledger_view.clone(); + async move { ledger.apply_modifier(md).await } + }) + .collect::>() + .await; + }) }) } } @@ -329,17 +331,19 @@ where match msg { DiffusionMessageV1::Inv(Modifiers { mod_type, modifiers }) => { let history = self.history.clone(); - self.tasks.spawn(|to_behaviour| async move { - let wanted = select_wanted(&history, &to_behaviour, modifiers).await; - if !wanted.is_empty() { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::request_modifiers_v1(mod_type, wanted), - })) - .await - .unwrap(); - } + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + let wanted = select_wanted(&history, &to_behaviour, modifiers).await; + if !wanted.is_empty() { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::request_modifiers_v1(mod_type, wanted), + })) + .await + .unwrap(); + } + }) }) } DiffusionMessageV1::RequestModifiers(Modifiers { mod_type, modifiers }) => { @@ -360,16 +364,18 @@ where fn inject_protocol_requested_locally(&mut self, peer_id: PeerId) { let service = self.remote_sync.clone(); - self.tasks.spawn(|to_behaviour| async move { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( - NetworkAction::EnablePeer { - peer_id, - handshakes: service.make_poly_handshake().await, - }, - ))) - .await - .unwrap(); + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( + NetworkAction::EnablePeer { + peer_id, + handshakes: service.make_poly_handshake().await, + }, + ))) + .await + .unwrap(); + }) }) } @@ -438,7 +444,7 @@ mod tests { let inv = DiffusionMessage::inv_v1(ModifierType::BlockHeader, inv_modifiers); let remote_pid = PeerId::random(); beh.inject_message(remote_pid, inv); - let handle = task::spawn(async move { + let handle = task::spawn_local(async move { let mut stream = BehaviourStream::new(beh); loop { match stream.select_next_some().await { @@ -475,7 +481,7 @@ mod tests { beh.inject_protocol_requested(remote_pid, Some(remote_hs)); - let handle = task::spawn(async move { + let handle = task::spawn_local(async move { let mut stream = BehaviourStream::new(beh); loop { match stream.select_next_some().await { diff --git a/spectrum-network/src/protocol_handler/multicasting.rs b/spectrum-network/src/protocol_handler/multicasting.rs index 3b2e2dd6..c887df45 100644 --- a/spectrum-network/src/protocol_handler/multicasting.rs +++ b/spectrum-network/src/protocol_handler/multicasting.rs @@ -1,6 +1,6 @@ use std::collections::{HashSet, VecDeque}; use std::ops::Sub; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -222,20 +222,22 @@ where fn inject_message(&mut self, peer_id: PeerId, content: S) { if self.overlay.parent_nodes.contains(&peer_id) { let pd = Arc::clone(&self.public_data); - self.tasks.spawn(|to_behaviour| async move { - if let Ok(ver) = content.verify(&pd).await { - to_behaviour - .send(FromTask::ToBehaviour(ApplyStatement(ver))) - .await - .unwrap(); - } else { - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(peer_id), - ))) - .await - .unwrap(); - } + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + if let Ok(ver) = content.verify(&pd).await { + to_behaviour + .send(FromTask::ToBehaviour(ApplyStatement(ver))) + .await + .unwrap(); + } else { + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(peer_id), + ))) + .await + .unwrap(); + } + }) }); } } diff --git a/spectrum-network/src/protocol_handler/pool.rs b/spectrum-network/src/protocol_handler/pool.rs index 3f193b4d..d512d13a 100644 --- a/spectrum-network/src/protocol_handler/pool.rs +++ b/spectrum-network/src/protocol_handler/pool.rs @@ -24,7 +24,7 @@ pub struct TaskPool<'a, TIn, TOut, R> { timeout: Duration, /// Communication channel with parental behaviour. channel: Sender>, - tasks: FuturesUnordered> + Send + 'a>>>, + tasks: FuturesUnordered> + 'a>>>, } impl<'a, TIn, TOut, R> TaskPool<'a, TIn, TOut, R> { @@ -40,11 +40,11 @@ impl<'a, TIn, TOut, R> TaskPool<'a, TIn, TOut, R> { pub fn spawn(&mut self, task: F) where F: FnOnce(Sender>) -> T, - T: Future + Send + 'a, + T: Future + 'a, R: 'a, { self.tasks - .push(timeout(self.timeout, task(self.channel.clone())).boxed()) + .push(Box::pin(timeout(self.timeout, task(self.channel.clone())))) } } @@ -54,7 +54,7 @@ impl<'a, TIn, TOut, R> Stream for TaskPool<'a, TIn, TOut, R> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if !self.tasks.is_empty() { - match Stream::poll_next(Pin::new(&mut self.tasks), cx) { + match Stream::poll_next(std::pin::pin!(&mut self.tasks), cx) { Poll::Ready(Some(Ok(res))) => { return Poll::Ready(Some(res)); } From c075e0bf307e168039898bec8d9bbf85f360b9fb Mon Sep 17 00:00:00 2001 From: Tim Ling <791016+kettlebell@users.noreply.github.com> Date: Sun, 30 Jul 2023 10:04:50 +1000 Subject: [PATCH 2/2] Remove unneeded calls to `Box::pin` in diffusion --- .../src/protocol_handler/diffusion.rs | 120 ++++++++---------- 1 file changed, 56 insertions(+), 64 deletions(-) diff --git a/spectrum-network/src/protocol_handler/diffusion.rs b/spectrum-network/src/protocol_handler/diffusion.rs index d350acdd..953c7424 100644 --- a/spectrum-network/src/protocol_handler/diffusion.rs +++ b/spectrum-network/src/protocol_handler/diffusion.rs @@ -176,7 +176,7 @@ where let service = self.remote_sync.clone(); let conf = self.conf; self.tasks.spawn(|to_behaviour| { - Box::pin(async move { + async move { let peer_state = service.remote_state(peer_status).await; to_behaviour.update_peer(peer_id, peer_state.clone()).await; if initial { @@ -230,23 +230,21 @@ where .unwrap(); } } - }) + } }) } fn on_modifiers_request(&mut self, peer_id: PeerId, mod_type: ModifierType, modifiers: Vec) { let service = self.remote_sync.clone(); - self.tasks.spawn(|to_behaviour| { - Box::pin(async move { - let raw_modifiers = service.get_modifiers(mod_type, modifiers).await; - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::Send { - peer_id, - message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers), - })) - .await - .unwrap(); - }) + self.tasks.spawn(|to_behaviour| async move { + let raw_modifiers = service.get_modifiers(mod_type, modifiers).await; + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::Send { + peer_id, + message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers), + })) + .await + .unwrap(); }) } @@ -257,33 +255,31 @@ where raw_modifiers: Vec, ) { let ledger_view = self.ledger_view.clone(); - self.tasks.spawn(|to_behaviour| { - Box::pin(async move { - let mut modifiers = vec![]; - for m in raw_modifiers { - if let Ok(md) = decode_modifier(mod_type, &m) { - to_behaviour - .update_modifier(md.id(), ModifierStatus::Received) - .await; - modifiers.push(md) - } else { - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(peer_id), - ))) - .await - .unwrap(); - break; - } + self.tasks.spawn(|to_behaviour| async move { + let mut modifiers = vec![]; + for m in raw_modifiers { + if let Ok(md) = decode_modifier(mod_type, &m) { + to_behaviour + .update_modifier(md.id(), ModifierStatus::Received) + .await; + modifiers.push(md) + } else { + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(peer_id), + ))) + .await + .unwrap(); + break; } - stream::iter(modifiers) - .then(|md| { - let mut ledger = ledger_view.clone(); - async move { ledger.apply_modifier(md).await } - }) - .collect::>() - .await; - }) + } + stream::iter(modifiers) + .then(|md| { + let mut ledger = ledger_view.clone(); + async move { ledger.apply_modifier(md).await } + }) + .collect::>() + .await; }) } } @@ -331,19 +327,17 @@ where match msg { DiffusionMessageV1::Inv(Modifiers { mod_type, modifiers }) => { let history = self.history.clone(); - self.tasks.spawn(|to_behaviour| { - Box::pin(async move { - let wanted = select_wanted(&history, &to_behaviour, modifiers).await; - if !wanted.is_empty() { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::request_modifiers_v1(mod_type, wanted), - })) - .await - .unwrap(); - } - }) + self.tasks.spawn(|to_behaviour| async move { + let wanted = select_wanted(&history, &to_behaviour, modifiers).await; + if !wanted.is_empty() { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::request_modifiers_v1(mod_type, wanted), + })) + .await + .unwrap(); + } }) } DiffusionMessageV1::RequestModifiers(Modifiers { mod_type, modifiers }) => { @@ -364,18 +358,16 @@ where fn inject_protocol_requested_locally(&mut self, peer_id: PeerId) { let service = self.remote_sync.clone(); - self.tasks.spawn(|to_behaviour| { - Box::pin(async move { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( - NetworkAction::EnablePeer { - peer_id, - handshakes: service.make_poly_handshake().await, - }, - ))) - .await - .unwrap(); - }) + self.tasks.spawn(|to_behaviour| async move { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( + NetworkAction::EnablePeer { + peer_id, + handshakes: service.make_poly_handshake().await, + }, + ))) + .await + .unwrap(); }) }