Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use multiaddr::{Multiaddr, Protocol};
use transport::Endpoint;
use types::ConnectionId;

use crate::transport::manager::DialFailureAddresses;
pub use bandwidth::BandwidthSink;
pub use error::Error;
pub use peer_id::PeerId;
Expand Down Expand Up @@ -198,6 +199,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
Expand All @@ -218,6 +220,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
Expand All @@ -233,6 +236,7 @@ impl Litep2p {
Vec::new(),
protocol.codec(),
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
Expand All @@ -252,6 +256,7 @@ impl Litep2p {
Vec::new(),
ping_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
Expand All @@ -275,6 +280,7 @@ impl Litep2p {
fallback_names,
kademlia_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::Required,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
Expand All @@ -296,6 +302,7 @@ impl Litep2p {
Vec::new(),
identify_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
identify_config.public = Some(litep2p_config.keypair.public().into());

Expand All @@ -316,6 +323,7 @@ impl Litep2p {
Vec::new(),
bitswap_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
Expand Down
48 changes: 31 additions & 17 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub(crate) mod handle;
/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::transport-manager";

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum DialFailureAddresses {
Required,
NotRequired,
}

/// The connection established result.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum ConnectionEstablishedResult {
Expand Down Expand Up @@ -106,6 +112,8 @@ pub struct ProtocolContext {

/// Fallback names for the protocol.
pub fallback_names: Vec<ProtocolName>,

pub dial_failure_mode: DialFailureAddresses,
}

impl ProtocolContext {
Expand All @@ -114,11 +122,20 @@ impl ProtocolContext {
codec: ProtocolCodec,
tx: Sender<InnerTransportEvent>,
fallback_names: Vec<ProtocolName>,
dial_failure_mode: DialFailureAddresses,
) -> Self {
Self {
tx,
codec,
fallback_names,
dial_failure_mode,
}
}

fn dial_failure_addresses(&self, addresses: &[Multiaddr]) -> Vec<Multiaddr> {
match self.dial_failure_mode {
DialFailureAddresses::Required => addresses.to_vec(),
DialFailureAddresses::NotRequired => Vec::new(),
}
}
}
Expand Down Expand Up @@ -332,6 +349,7 @@ impl TransportManager {
fallback_names: Vec<ProtocolName>,
codec: ProtocolCodec,
keep_alive_timeout: Duration,
dial_failure_mode: DialFailureAddresses,
) -> TransportService {
assert!(!self.protocol_names.contains(&protocol));

Expand All @@ -352,7 +370,7 @@ impl TransportManager {

self.protocols.insert(
protocol.clone(),
ProtocolContext::new(codec, sender, fallback_names.clone()),
ProtocolContext::new(codec, sender, fallback_names.clone(), dial_failure_mode),
);
self.protocol_names.insert(protocol);
self.protocol_names.extend(fallback_names);
Expand Down Expand Up @@ -1116,10 +1134,10 @@ impl TransportManager {
?protocol,
"dial failure, notify protocol",
);
match context.tx.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
}) {

let adresses = context.dial_failure_addresses(&[address.clone()]);

match context.tx.try_send(make_dial_failure_event(peer, adresses.clone())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a way to avoid the cloning here? Maybe changing the signature of the fn to take a & Multiaddr?

I would also inline the make_dial_failure_event:

match context.tx.try_send(InnerTransportEvent::DialFailure { peer, addresses })

Instead of cloning the addreses here (addresses.clone()), we could recreate the addresses on error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have changed to this inline way, but can't still do without cloning

Ok(()) => {}
Err(_) => {
tracing::trace!(
Expand All @@ -1132,10 +1150,7 @@ impl TransportManager {
);
let _ = context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
})
.send(make_dial_failure_event(peer, adresses.clone()))
.await;
}
}
Expand Down Expand Up @@ -1266,12 +1281,10 @@ impl TransportManager {
.collect::<Vec<_>>();

for (protocol, context) in &self.protocols {
let addresses = context.dial_failure_addresses(&addresses);
let _ = match context
.tx
.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
}) {
.try_send(make_dial_failure_event(peer, addresses.clone())) {
Ok(_) => Ok(()),
Err(_) => {
tracing::trace!(
Expand All @@ -1284,10 +1297,7 @@ impl TransportManager {

context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
})
.send(make_dial_failure_event(peer, addresses.clone()))
.await
}
};
Expand Down Expand Up @@ -1343,6 +1353,10 @@ impl TransportManager {
}
}

fn make_dial_failure_event(peer: PeerId, addresses: Vec<Multiaddr>) -> InnerTransportEvent {
InnerTransportEvent::DialFailure { peer, addresses }
}

#[cfg(test)]
mod tests {
use crate::transport::manager::{address::AddressStore, peer_state::SecondaryOrDialing};
Expand Down