diff --git a/spectrum-crypto/src/lib.rs b/spectrum-crypto/src/lib.rs index ad75d09b..5029a4f7 100644 --- a/spectrum-crypto/src/lib.rs +++ b/spectrum-crypto/src/lib.rs @@ -1,4 +1,4 @@ -use async_trait::async_trait; +#![feature(async_fn_in_trait)] pub mod digest; mod hash; @@ -15,7 +15,6 @@ pub trait VerifiableAgainst

{ pub struct Verified(pub S); /// Some statement which can be verified against public data `P`. -#[async_trait] pub trait AsyncVerifiable

: Send + Sync + Sized { type Err: Send; async fn verify(self, public_data: &P) -> Result, Self::Err>; diff --git a/spectrum-network/src/lib.rs b/spectrum-network/src/lib.rs index 73610c7d..2bced3a5 100644 --- a/spectrum-network/src/lib.rs +++ b/spectrum-network/src/lib.rs @@ -1,9 +1,11 @@ +#![feature(async_fn_in_trait)] + pub mod network_controller; +pub mod one_shot_upgrade; pub mod peer_conn_handler; pub mod peer_manager; pub mod protocol; +pub mod protocol_api; pub mod protocol_handler; pub mod protocol_upgrade; pub mod types; -pub mod protocol_api; -pub mod one_shot_upgrade; diff --git a/spectrum-network/src/protocol_handler/diffusion.rs b/spectrum-network/src/protocol_handler/diffusion.rs index df368e4d..953c7424 100644 --- a/spectrum-network/src/protocol_handler/diffusion.rs +++ b/spectrum-network/src/protocol_handler/diffusion.rs @@ -67,13 +67,11 @@ enum DiffusionBehaviourIn { }, } -#[async_trait::async_trait] trait DiffusionStateWrite { async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState); async fn update_modifier(&self, modifier_id: ModifierId, status: ModifierStatus); } -#[async_trait::async_trait] impl DiffusionStateWrite for Sender> { async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState) { self.send(FromTask::ToBehaviour(DiffusionBehaviourIn::UpdatePeer { @@ -94,12 +92,10 @@ impl DiffusionStateWrite for Sender ModifierStatus; } -#[async_trait::async_trait] impl DiffusionStateRead for Sender> { async fn modifier_status(&self, modifier: ModifierId) -> ModifierStatus { let (snd, recv) = oneshot::channel(); @@ -179,58 +175,60 @@ where fn on_sync(&mut self, peer_id: PeerId, peer_status: SyncStatus, initial: bool) { let service = self.remote_sync.clone(); let conf = self.conf; - self.tasks.spawn(|to_behaviour| async move { - let peer_state = service.remote_state(peer_status).await; - to_behaviour.update_peer(peer_id, peer_state.clone()).await; - if initial { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( - NetworkAction::EnablePeer { - peer_id, - handshakes: service.make_poly_handshake().await, - }, - ))) - .await - .unwrap(); - } - match peer_state.cmp { - RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {} - RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => { - if !initial { - // sync is alerady included into handshake if initial + self.tasks.spawn(|to_behaviour| { + async move { + let peer_state = service.remote_state(peer_status).await; + to_behaviour.update_peer(peer_id, peer_state.clone()).await; + if initial { + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction( + NetworkAction::EnablePeer { + peer_id, + handshakes: service.make_poly_handshake().await, + }, + ))) + .await + .unwrap(); + } + match peer_state.cmp { + RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {} + RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => { + if !initial { + // sync is alerady included into handshake if initial + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::sync_status_v1(service.local_status().await), + })) + .await + .unwrap(); + } + } + RemoteChainCmp::Longer(Some(wanted_suffix)) => { to_behaviour .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { peer_id, - message: DiffusionMessage::sync_status_v1(service.local_status().await), + message: DiffusionMessage::request_modifiers_v1( + ModifierType::BlockHeader, + wanted_suffix.into_iter().map(ModifierId::from).collect(), + ), + })) + .await + .unwrap(); + } + RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => { + let ext = service.extension(remote_tip, conf.max_inv_size).await; + to_behaviour + .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { + peer_id, + message: DiffusionMessage::inv_v1( + ModifierType::BlockHeader, + ext.into_iter().map(ModifierId::from).collect(), + ), })) .await .unwrap(); } - } - RemoteChainCmp::Longer(Some(wanted_suffix)) => { - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::request_modifiers_v1( - ModifierType::BlockHeader, - wanted_suffix.into_iter().map(ModifierId::from).collect(), - ), - })) - .await - .unwrap(); - } - RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => { - let ext = service.extension(remote_tip, conf.max_inv_size).await; - to_behaviour - .send(FromTask::ToHandler(DiffusionBehaviourOut::Send { - peer_id, - message: DiffusionMessage::inv_v1( - ModifierType::BlockHeader, - ext.into_iter().map(ModifierId::from).collect(), - ), - })) - .await - .unwrap(); } } }) @@ -438,7 +436,7 @@ mod tests { let inv = DiffusionMessage::inv_v1(ModifierType::BlockHeader, inv_modifiers); let remote_pid = PeerId::random(); beh.inject_message(remote_pid, inv); - let handle = task::spawn(async move { + let handle = task::spawn_local(async move { let mut stream = BehaviourStream::new(beh); loop { match stream.select_next_some().await { @@ -475,7 +473,7 @@ mod tests { beh.inject_protocol_requested(remote_pid, Some(remote_hs)); - let handle = task::spawn(async move { + let handle = task::spawn_local(async move { let mut stream = BehaviourStream::new(beh); loop { match stream.select_next_some().await { diff --git a/spectrum-network/src/protocol_handler/multicasting.rs b/spectrum-network/src/protocol_handler/multicasting.rs index 3b2e2dd6..c887df45 100644 --- a/spectrum-network/src/protocol_handler/multicasting.rs +++ b/spectrum-network/src/protocol_handler/multicasting.rs @@ -1,6 +1,6 @@ use std::collections::{HashSet, VecDeque}; use std::ops::Sub; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -222,20 +222,22 @@ where fn inject_message(&mut self, peer_id: PeerId, content: S) { if self.overlay.parent_nodes.contains(&peer_id) { let pd = Arc::clone(&self.public_data); - self.tasks.spawn(|to_behaviour| async move { - if let Ok(ver) = content.verify(&pd).await { - to_behaviour - .send(FromTask::ToBehaviour(ApplyStatement(ver))) - .await - .unwrap(); - } else { - to_behaviour - .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( - NetworkAction::BanPeer(peer_id), - ))) - .await - .unwrap(); - } + self.tasks.spawn(|to_behaviour| { + Box::pin(async move { + if let Ok(ver) = content.verify(&pd).await { + to_behaviour + .send(FromTask::ToBehaviour(ApplyStatement(ver))) + .await + .unwrap(); + } else { + to_behaviour + .send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction( + NetworkAction::BanPeer(peer_id), + ))) + .await + .unwrap(); + } + }) }); } } diff --git a/spectrum-network/src/protocol_handler/pool.rs b/spectrum-network/src/protocol_handler/pool.rs index 3f193b4d..d512d13a 100644 --- a/spectrum-network/src/protocol_handler/pool.rs +++ b/spectrum-network/src/protocol_handler/pool.rs @@ -24,7 +24,7 @@ pub struct TaskPool<'a, TIn, TOut, R> { timeout: Duration, /// Communication channel with parental behaviour. channel: Sender>, - tasks: FuturesUnordered> + Send + 'a>>>, + tasks: FuturesUnordered> + 'a>>>, } impl<'a, TIn, TOut, R> TaskPool<'a, TIn, TOut, R> { @@ -40,11 +40,11 @@ impl<'a, TIn, TOut, R> TaskPool<'a, TIn, TOut, R> { pub fn spawn(&mut self, task: F) where F: FnOnce(Sender>) -> T, - T: Future + Send + 'a, + T: Future + 'a, R: 'a, { self.tasks - .push(timeout(self.timeout, task(self.channel.clone())).boxed()) + .push(Box::pin(timeout(self.timeout, task(self.channel.clone())))) } } @@ -54,7 +54,7 @@ impl<'a, TIn, TOut, R> Stream for TaskPool<'a, TIn, TOut, R> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if !self.tasks.is_empty() { - match Stream::poll_next(Pin::new(&mut self.tasks), cx) { + match Stream::poll_next(std::pin::pin!(&mut self.tasks), cx) { Poll::Ready(Some(Ok(res))) => { return Poll::Ready(Some(res)); }