Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove code that can panic #5

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion spectrum-network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod network_controller;
pub mod peer_conn_handler;
pub mod peer_manager;
pub mod protocol;
pub mod protocol_api;
pub mod protocol_handler;
pub mod protocol_upgrade;
pub mod types;
pub mod protocol_api;
73 changes: 49 additions & 24 deletions spectrum-network/src/network_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::peer_conn_handler::{
use crate::peer_manager::{PeerEvents, PeerManagerOut, Peers};
use crate::protocol::ProtocolConfig;
use crate::protocol_api::ProtocolEvents;
use crate::protocol_upgrade::supported_protocol_vers::{SupportedProtocolId, SupportedProtocolIdMap};
use crate::types::{ProtocolId, ProtocolVer};

use libp2p::core::connection::ConnectionId;
Expand All @@ -24,7 +25,7 @@ use std::task::{Context, Poll};
use crate::peer_manager::data::{ConnectionLossReason, ReputationChange};
use crate::protocol_upgrade::handshake::PolyVerHandshakeSpec;
use futures::channel::mpsc::{Receiver, Sender};
use futures::{SinkExt, Stream};
use futures::{FutureExt, SinkExt, Stream};

/// States of an enabled protocol.
#[derive(Debug)]
Expand Down Expand Up @@ -92,7 +93,7 @@ pub enum NetworkControllerIn {
/// A directive to enable the specified protocol with the specified peer.
EnableProtocol {
/// The desired protocol.
protocol: ProtocolId,
protocol: SupportedProtocolId,
/// A specific peer we should start the protocol with.
peer: PeerId,
/// A handshake to send to the peer upon negotiation of protocol substream.
Expand All @@ -108,7 +109,7 @@ pub enum NetworkControllerIn {
/// External API to network controller.
pub trait NetworkAPI {
/// Enables the specified protocol with the specified peer.
fn enable_protocol(&self, protocol: ProtocolId, peer: PeerId, handshake: PolyVerHandshakeSpec);
fn enable_protocol(&self, protocol: SupportedProtocolId, peer: PeerId, handshake: PolyVerHandshakeSpec);

/// Updates the set of protocols supported by the specified peer.
fn update_peer_protocols(&self, peer: PeerId, protocols: Vec<ProtocolId>);
Expand All @@ -120,7 +121,7 @@ pub struct NetworkMailbox {
}

impl NetworkAPI for NetworkMailbox {
fn enable_protocol(&self, protocol: ProtocolId, peer: PeerId, handshake: PolyVerHandshakeSpec) {
fn enable_protocol(&self, protocol: SupportedProtocolId, peer: PeerId, handshake: PolyVerHandshakeSpec) {
let _ = futures::executor::block_on(self.mailbox_snd.clone().send(
NetworkControllerIn::EnableProtocol {
protocol,
Expand Down Expand Up @@ -217,7 +218,7 @@ impl<TPeers, TPeerManager, THandler> NetworkEvents for NetworkController<TPeers,
pub struct NetworkController<TPeers, TPeerManager, THandler> {
conn_handler_conf: PeerConnHandlerConf,
/// All supported protocols and their handlers
supported_protocols: HashMap<ProtocolId, (ProtocolConfig, THandler)>,
supported_protocols: SupportedProtocolIdMap<(ProtocolConfig, THandler)>,
/// PeerManager API
peers: TPeers,
/// PeerManager stream itself
Expand All @@ -240,7 +241,7 @@ where
) -> Self {
Self {
conn_handler_conf,
supported_protocols,
supported_protocols: supported_protocols.into(),
peers,
peer_manager,
enabled_peers: HashMap::new(),
Expand All @@ -254,8 +255,7 @@ where
self.conn_handler_conf.clone(),
self.supported_protocols
.iter()
.clone()
.map(|(prot_id, (conf, _))| (*prot_id, conf.clone()))
.map(|(prot_id, (conf, _))| (prot_id, conf.clone()))
.collect::<Vec<_>>(),
)
}
Expand Down Expand Up @@ -361,7 +361,7 @@ where
{
let protocol_id = protocol_tag.protocol_id();
let protocol_ver = protocol_tag.protocol_ver();
match enabled_protocols.entry(protocol_id) {
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Occupied(mut entry) => {
trace!(
"Current state of protocol {:?} is {:?}",
Expand All @@ -371,16 +371,20 @@ where
if let (EnabledProtocol::PendingEnable, handler) = entry.get() {
handler.protocol_enabled(
peer_id,
protocol_ver,
protocol_ver.get_inner(),
out_channel.clone(),
handshake,
);
let enabled_protocol = EnabledProtocol::Enabled {
ver: protocol_ver,
ver: protocol_ver.get_inner(),
sink: out_channel,
};
entry.insert((enabled_protocol, handler.clone()));
self.protocol_enabled(peer_id, protocol_id, protocol_ver);
self.protocol_enabled(
peer_id,
protocol_id.get_inner(),
protocol_ver.get_inner(),
);
}
}
Entry::Vacant(entry) => {
Expand All @@ -398,12 +402,16 @@ where
}) = self.enabled_peers.get_mut(&peer_id)
{
let protocol_id = protocol_tag.protocol_id();
let (_, prot_handler) = self.supported_protocols.get(&protocol_id).unwrap();
match enabled_protocols.entry(protocol_id) {
let (_, prot_handler) = self.supported_protocols.get_supported(protocol_id);
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Vacant(entry) => {
entry.insert((EnabledProtocol::PendingApprove, prot_handler.clone()));
prot_handler.protocol_requested(peer_id, protocol_tag.protocol_ver(), handshake);
self.protocol_pending_approve(peer_id, protocol_id);
prot_handler.protocol_requested(
peer_id,
protocol_tag.protocol_ver().get_inner(),
handshake,
);
self.protocol_pending_approve(peer_id, protocol_id.get_inner());
}
Entry::Occupied(_) => {
warn!(
Expand All @@ -420,9 +428,26 @@ where
}
}
}
ConnHandlerOut::ClosedByPeer(protocol_id)
| ConnHandlerOut::RefusedToOpen(protocol_id)
| ConnHandlerOut::Closed(protocol_id) => {
ConnHandlerOut::ClosedByPeer(protocol_id) | ConnHandlerOut::RefusedToOpen(protocol_id) => {
if let Some(ConnectedPeer::Connected {
enabled_protocols, ..
}) = self.enabled_peers.get_mut(&peer_id)
{
match enabled_protocols.entry(protocol_id.get_inner()) {
Entry::Occupied(entry) => {
trace!(
"Peer {:?} closed the substream for protocol {:?}",
peer_id,
protocol_id
);
entry.remove();
}
Entry::Vacant(_) => {}
}
}
}

ConnHandlerOut::Closed(protocol_id) => {
if let Some(ConnectedPeer::Connected {
enabled_protocols, ..
}) = self.enabled_peers.get_mut(&peer_id)
Expand Down Expand Up @@ -553,8 +578,8 @@ where
ConnectedPeer::Connected {
enabled_protocols, ..
} => {
let (_, prot_handler) = self.supported_protocols.get(&protocol).unwrap();
match enabled_protocols.entry(protocol) {
let (_, prot_handler) = self.supported_protocols.get_supported(protocol);
match enabled_protocols.entry(protocol.get_inner()) {
Entry::Occupied(_) => warn!(
"PM requested already enabled protocol {:?} with peer {:?}",
protocol, pid
Expand All @@ -565,9 +590,8 @@ where
prot_handler.clone(),
));
prot_handler.protocol_requested_local(pid);
self.protocol_pending_enable(pid, protocol);
}
};
}
}
ConnectedPeer::PendingConnect
| ConnectedPeer::PendingApprove(_)
Expand Down Expand Up @@ -602,7 +626,8 @@ where
enabled_protocols,
}) = self.enabled_peers.get_mut(&peer_id)
{
let (_, prot_handler) = self.supported_protocols.get(&protocol_id).unwrap();
let (_, prot_handler) = self.supported_protocols.get_supported(protocol_id);
let protocol_id = protocol_id.get_inner();
match enabled_protocols.entry(protocol_id) {
Entry::Occupied(protocol_entry) => match protocol_entry.remove_entry().1 {
// Protocol handler approves either outbound or inbound protocol request.
Expand Down
Loading