Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve process-to-process communications #70

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions spectrum-crypto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use async_trait::async_trait;
#![feature(async_fn_in_trait)]

pub mod digest;
mod hash;
Expand All @@ -15,7 +15,6 @@ pub trait VerifiableAgainst<P> {
pub struct Verified<S>(pub S);

/// Some statement which can be verified against public data `P`.
#[async_trait]
pub trait AsyncVerifiable<P>: Send + Sync + Sized {
type Err: Send;
async fn verify(self, public_data: &P) -> Result<Verified<Self>, Self::Err>;
Expand Down
6 changes: 4 additions & 2 deletions spectrum-network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#![feature(async_fn_in_trait)]

pub mod network_controller;
pub mod one_shot_upgrade;
pub mod peer_conn_handler;
pub mod peer_manager;
pub mod protocol;
pub mod protocol_api;
pub mod protocol_handler;
pub mod protocol_upgrade;
pub mod types;
pub mod protocol_api;
pub mod one_shot_upgrade;
218 changes: 112 additions & 106 deletions spectrum-network/src/protocol_handler/diffusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,11 @@ enum DiffusionBehaviourIn {
},
}

#[async_trait::async_trait]
trait DiffusionStateWrite {
async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState);
async fn update_modifier(&self, modifier_id: ModifierId, status: ModifierStatus);
}

#[async_trait::async_trait]
impl DiffusionStateWrite for Sender<FromTask<DiffusionBehaviourIn, DiffusionBehaviourOut>> {
async fn update_peer(&self, peer_id: PeerId, peer_state: SyncState) {
self.send(FromTask::ToBehaviour(DiffusionBehaviourIn::UpdatePeer {
Expand All @@ -94,12 +92,10 @@ impl DiffusionStateWrite for Sender<FromTask<DiffusionBehaviourIn, DiffusionBeha
}
}

#[async_trait::async_trait]
trait DiffusionStateRead {
async fn modifier_status(&self, mid: ModifierId) -> ModifierStatus;
}

#[async_trait::async_trait]
impl DiffusionStateRead for Sender<FromTask<DiffusionBehaviourIn, DiffusionBehaviourOut>> {
async fn modifier_status(&self, modifier: ModifierId) -> ModifierStatus {
let (snd, recv) = oneshot::channel();
Expand Down Expand Up @@ -179,74 +175,78 @@ where
fn on_sync(&mut self, peer_id: PeerId, peer_status: SyncStatus, initial: bool) {
let service = self.remote_sync.clone();
let conf = self.conf;
self.tasks.spawn(|to_behaviour| async move {
let peer_state = service.remote_state(peer_status).await;
to_behaviour.update_peer(peer_id, peer_state.clone()).await;
if initial {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction(
NetworkAction::EnablePeer {
peer_id,
handshakes: service.make_poly_handshake().await,
},
)))
.await
.unwrap();
}
match peer_state.cmp {
RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {}
RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => {
if !initial {
// sync is alerady included into handshake if initial
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure additional boxing is an improvement?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boxing was unnecessary, I've removed the calls to Box::pin.

let peer_state = service.remote_state(peer_status).await;
to_behaviour.update_peer(peer_id, peer_state.clone()).await;
if initial {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction(
NetworkAction::EnablePeer {
peer_id,
handshakes: service.make_poly_handshake().await,
},
)))
.await
.unwrap();
}
match peer_state.cmp {
RemoteChainCmp::Equal | RemoteChainCmp::Nonsense => {}
RemoteChainCmp::Longer(None) | RemoteChainCmp::Fork(None) => {
if !initial {
// sync is alerady included into handshake if initial
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::sync_status_v1(service.local_status().await),
}))
.await
.unwrap();
}
}
RemoteChainCmp::Longer(Some(wanted_suffix)) => {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::sync_status_v1(service.local_status().await),
message: DiffusionMessage::request_modifiers_v1(
ModifierType::BlockHeader,
wanted_suffix.into_iter().map(ModifierId::from).collect(),
),
}))
.await
.unwrap();
}
RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => {
let ext = service.extension(remote_tip, conf.max_inv_size).await;
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::inv_v1(
ModifierType::BlockHeader,
ext.into_iter().map(ModifierId::from).collect(),
),
}))
.await
.unwrap();
}
}
RemoteChainCmp::Longer(Some(wanted_suffix)) => {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::request_modifiers_v1(
ModifierType::BlockHeader,
wanted_suffix.into_iter().map(ModifierId::from).collect(),
),
}))
.await
.unwrap();
}
RemoteChainCmp::Shorter(remote_tip) | RemoteChainCmp::Fork(Some(remote_tip)) => {
let ext = service.extension(remote_tip, conf.max_inv_size).await;
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::inv_v1(
ModifierType::BlockHeader,
ext.into_iter().map(ModifierId::from).collect(),
),
}))
.await
.unwrap();
}
}
})
})
}

fn on_modifiers_request(&mut self, peer_id: PeerId, mod_type: ModifierType, modifiers: Vec<ModifierId>) {
let service = self.remote_sync.clone();
self.tasks.spawn(|to_behaviour| async move {
let raw_modifiers = service.get_modifiers(mod_type, modifiers).await;
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::Send {
peer_id,
message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers),
}))
.await
.unwrap();
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
let raw_modifiers = service.get_modifiers(mod_type, modifiers).await;
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::Send {
peer_id,
message: DiffusionMessage::modifiers_v1(mod_type, raw_modifiers),
}))
.await
.unwrap();
})
})
}

Expand All @@ -257,31 +257,33 @@ where
raw_modifiers: Vec<SerializedModifier>,
) {
let ledger_view = self.ledger_view.clone();
self.tasks.spawn(|to_behaviour| async move {
let mut modifiers = vec![];
for m in raw_modifiers {
if let Ok(md) = decode_modifier(mod_type, &m) {
to_behaviour
.update_modifier(md.id(), ModifierStatus::Received)
.await;
modifiers.push(md)
} else {
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction(
NetworkAction::BanPeer(peer_id),
)))
.await
.unwrap();
break;
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
let mut modifiers = vec![];
for m in raw_modifiers {
if let Ok(md) = decode_modifier(mod_type, &m) {
to_behaviour
.update_modifier(md.id(), ModifierStatus::Received)
.await;
modifiers.push(md)
} else {
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction(
NetworkAction::BanPeer(peer_id),
)))
.await
.unwrap();
break;
}
}
}
stream::iter(modifiers)
.then(|md| {
let mut ledger = ledger_view.clone();
async move { ledger.apply_modifier(md).await }
})
.collect::<Vec<_>>()
.await;
stream::iter(modifiers)
.then(|md| {
let mut ledger = ledger_view.clone();
async move { ledger.apply_modifier(md).await }
})
.collect::<Vec<_>>()
.await;
})
})
}
}
Expand Down Expand Up @@ -329,17 +331,19 @@ where
match msg {
DiffusionMessageV1::Inv(Modifiers { mod_type, modifiers }) => {
let history = self.history.clone();
self.tasks.spawn(|to_behaviour| async move {
let wanted = select_wanted(&history, &to_behaviour, modifiers).await;
if !wanted.is_empty() {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::request_modifiers_v1(mod_type, wanted),
}))
.await
.unwrap();
}
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
let wanted = select_wanted(&history, &to_behaviour, modifiers).await;
if !wanted.is_empty() {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::Send {
peer_id,
message: DiffusionMessage::request_modifiers_v1(mod_type, wanted),
}))
.await
.unwrap();
}
})
})
}
DiffusionMessageV1::RequestModifiers(Modifiers { mod_type, modifiers }) => {
Expand All @@ -360,16 +364,18 @@ where

fn inject_protocol_requested_locally(&mut self, peer_id: PeerId) {
let service = self.remote_sync.clone();
self.tasks.spawn(|to_behaviour| async move {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction(
NetworkAction::EnablePeer {
peer_id,
handshakes: service.make_poly_handshake().await,
},
)))
.await
.unwrap();
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
to_behaviour
.send(FromTask::ToHandler(DiffusionBehaviourOut::NetworkAction(
NetworkAction::EnablePeer {
peer_id,
handshakes: service.make_poly_handshake().await,
},
)))
.await
.unwrap();
})
})
}

Expand Down Expand Up @@ -438,7 +444,7 @@ mod tests {
let inv = DiffusionMessage::inv_v1(ModifierType::BlockHeader, inv_modifiers);
let remote_pid = PeerId::random();
beh.inject_message(remote_pid, inv);
let handle = task::spawn(async move {
let handle = task::spawn_local(async move {
let mut stream = BehaviourStream::new(beh);
loop {
match stream.select_next_some().await {
Expand Down Expand Up @@ -475,7 +481,7 @@ mod tests {

beh.inject_protocol_requested(remote_pid, Some(remote_hs));

let handle = task::spawn(async move {
let handle = task::spawn_local(async move {
let mut stream = BehaviourStream::new(beh);
loop {
match stream.select_next_some().await {
Expand Down
32 changes: 17 additions & 15 deletions spectrum-network/src/protocol_handler/multicasting.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{HashSet, VecDeque};
use std::ops::Sub;
use std::pin::Pin;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -222,20 +222,22 @@ where
fn inject_message(&mut self, peer_id: PeerId, content: S) {
if self.overlay.parent_nodes.contains(&peer_id) {
let pd = Arc::clone(&self.public_data);
self.tasks.spawn(|to_behaviour| async move {
if let Ok(ver) = content.verify(&pd).await {
to_behaviour
.send(FromTask::ToBehaviour(ApplyStatement(ver)))
.await
.unwrap();
} else {
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction(
NetworkAction::BanPeer(peer_id),
)))
.await
.unwrap();
}
self.tasks.spawn(|to_behaviour| {
Box::pin(async move {
if let Ok(ver) = content.verify(&pd).await {
to_behaviour
.send(FromTask::ToBehaviour(ApplyStatement(ver)))
.await
.unwrap();
} else {
to_behaviour
.send(FromTask::ToHandler(ProtocolBehaviourOut::NetworkAction(
NetworkAction::BanPeer(peer_id),
)))
.await
.unwrap();
}
})
});
}
}
Expand Down
Loading