Skip to content

Commit 7c05d29

Browse files
authored
transport: Abort canceled dial attempts for TCP, WebSocket and Quic (#255)
The `TransportManager` initiates a dialing process on multiple addresses and multiple transports (ie TCP WebSocket). The first established connection is reported back from the Transport layer to the `TransportManger`. Then, the `Transport Manager` cancels all ongoing dialing attempts on the remaining layers. Previously, cancelling implies storing a `ConnectionID` to a HashSet. This has the downside that all ongoing dial attempts are polled to completion. In this PR, the futures that establish socket connections are now aborted directly. ### Example ```bash Manager ---> TCP (dial address A, B, C) ---> WebSocket ( dial address D, E, F) ``` T0. Manager initiates dialing on A, B, C for TCP and D, E, F on WebSocket T1. Established socket connection on address A from TCP (B and C are dropped) T2. Manager cancels D, E, F from WebSocket ### Before T2 implies adding a connectionID to a hashset: https://github.com/paritytech/litep2p/blob/14dc4cc133e4c09f06b75119970583973a8353f0/src/transport/tcp/mod.rs#L518-L519 The worst case scenario: - wait for all D, E, F to establish connection, then emit back event to manager if the connection ID was not canceled The best case scenario: - wait for one of D, E, F to establish connection, then emit back event to manager if the connection ID was not canceled https://github.com/paritytech/litep2p/blob/d50ec1014479d9a49aabbb9ae1c8587b702d0314/src/transport/tcp/mod.rs#L536-L542 ### After The future that handles dialing is abortable. This way, we don't have to wait for (worst case) all addresses to fail to connect, or (best case) one address to connect. --------- Signed-off-by: Alexandru Vasile <[email protected]>
1 parent 9ad20ff commit 7c05d29

File tree

3 files changed

+183
-83
lines changed

3 files changed

+183
-83
lines changed

src/transport/quic/mod.rs

Lines changed: 62 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ use crate::{
3434
PeerId,
3535
};
3636

37-
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
37+
use futures::{
38+
future::BoxFuture,
39+
stream::{AbortHandle, FuturesUnordered},
40+
Stream, StreamExt, TryFutureExt,
41+
};
3842
use multiaddr::{Multiaddr, Protocol};
3943
use quinn::{ClientConfig, Connecting, Connection, Endpoint, IdleTimeout};
4044

@@ -66,6 +70,25 @@ struct NegotiatedConnection {
6670
connection: Connection,
6771
}
6872

73+
#[derive(Debug)]
74+
enum RawConnectionResult {
75+
/// The first successful connection.
76+
Connected {
77+
connection_id: ConnectionId,
78+
address: Multiaddr,
79+
stream: NegotiatedConnection,
80+
},
81+
82+
/// All connection attempts failed.
83+
Failed {
84+
connection_id: ConnectionId,
85+
errors: Vec<(Multiaddr, DialError)>,
86+
},
87+
88+
/// Future was canceled.
89+
Canceled { connection_id: ConnectionId },
90+
}
91+
6992
/// QUIC transport object.
7093
pub(crate) struct QuicTransport {
7194
/// Transport handle.
@@ -92,21 +115,15 @@ pub(crate) struct QuicTransport {
92115
pending_open: HashMap<ConnectionId, (NegotiatedConnection, Litep2pEndpoint)>,
93116

94117
/// Pending raw, unnegotiated connections.
95-
pending_raw_connections: FuturesUnordered<
96-
BoxFuture<
97-
'static,
98-
Result<
99-
(ConnectionId, Multiaddr, NegotiatedConnection),
100-
(ConnectionId, Vec<(Multiaddr, DialError)>),
101-
>,
102-
>,
103-
>,
118+
pending_raw_connections: FuturesUnordered<BoxFuture<'static, RawConnectionResult>>,
104119

105120
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
106121
opened_raw: HashMap<ConnectionId, (NegotiatedConnection, Multiaddr)>,
107122

108123
/// Canceled raw connections.
109124
canceled: HashSet<ConnectionId>,
125+
126+
cancel_futures: HashMap<ConnectionId, AbortHandle>,
110127
}
111128

112129
impl QuicTransport {
@@ -225,6 +242,7 @@ impl TransportBuilder for QuicTransport {
225242
pending_inbound_connections: HashMap::new(),
226243
pending_raw_connections: FuturesUnordered::new(),
227244
pending_connections: FuturesUnordered::new(),
245+
cancel_futures: HashMap::new(),
228246
},
229247
listen_addresses,
230248
))
@@ -407,12 +425,18 @@ impl Transport for QuicTransport {
407425
})
408426
.collect();
409427

410-
self.pending_raw_connections.push(Box::pin(async move {
428+
// Future that will resolve to the first successful connection.
429+
let future = async move {
411430
let mut errors = Vec::with_capacity(num_addresses);
412431

413432
while let Some(result) = futures.next().await {
414433
match result {
415-
Ok((address, connection)) => return Ok((connection_id, address, connection)),
434+
Ok((address, stream)) =>
435+
return RawConnectionResult::Connected {
436+
connection_id,
437+
address,
438+
stream,
439+
},
416440
Err(error) => {
417441
tracing::debug!(
418442
target: LOG_TARGET,
@@ -425,8 +449,16 @@ impl Transport for QuicTransport {
425449
}
426450
}
427451

428-
Err((connection_id, errors))
429-
}));
452+
RawConnectionResult::Failed {
453+
connection_id,
454+
errors,
455+
}
456+
};
457+
458+
let (fut, handle) = futures::future::abortable(future);
459+
let fut = fut.unwrap_or_else(move |_| RawConnectionResult::Canceled { connection_id });
460+
self.pending_raw_connections.push(Box::pin(fut));
461+
self.cancel_futures.insert(connection_id, handle);
430462

431463
Ok(())
432464
}
@@ -446,6 +478,7 @@ impl Transport for QuicTransport {
446478
/// Cancel opening connections.
447479
fn cancel(&mut self, connection_id: ConnectionId) {
448480
self.canceled.insert(connection_id);
481+
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
449482
}
450483
}
451484

@@ -470,32 +503,35 @@ impl Stream for QuicTransport {
470503
}
471504

472505
while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
473-
match result {
474-
Ok((connection_id, address, stream)) => {
475-
tracing::trace!(
476-
target: LOG_TARGET,
477-
?connection_id,
478-
?address,
479-
canceled = self.canceled.contains(&connection_id),
480-
"connection opened",
481-
);
506+
tracing::trace!(target: LOG_TARGET, ?result, "raw connection result");
482507

508+
match result {
509+
RawConnectionResult::Connected {
510+
connection_id,
511+
address,
512+
stream,
513+
} =>
483514
if !self.canceled.remove(&connection_id) {
484515
self.opened_raw.insert(connection_id, (stream, address.clone()));
485516

486517
return Poll::Ready(Some(TransportEvent::ConnectionOpened {
487518
connection_id,
488519
address,
489520
}));
490-
}
491-
}
492-
Err((connection_id, errors)) =>
521+
},
522+
RawConnectionResult::Failed {
523+
connection_id,
524+
errors,
525+
} =>
493526
if !self.canceled.remove(&connection_id) {
494527
return Poll::Ready(Some(TransportEvent::OpenFailure {
495528
connection_id,
496529
errors,
497530
}));
498531
},
532+
RawConnectionResult::Canceled { connection_id } => {
533+
self.canceled.remove(&connection_id);
534+
}
499535
}
500536
}
501537

src/transport/tcp/mod.rs

Lines changed: 59 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ use crate::{
3838

3939
use futures::{
4040
future::BoxFuture,
41-
stream::{FuturesUnordered, Stream, StreamExt},
41+
stream::{AbortHandle, FuturesUnordered, Stream, StreamExt},
42+
TryFutureExt,
4243
};
4344
use multiaddr::Multiaddr;
4445
use socket2::{Domain, Socket, Type};
@@ -70,6 +71,25 @@ struct PendingInboundConnection {
7071
address: SocketAddr,
7172
}
7273

74+
#[derive(Debug)]
75+
enum RawConnectionResult {
76+
/// The first successful connection.
77+
Connected {
78+
connection_id: ConnectionId,
79+
address: Multiaddr,
80+
stream: TcpStream,
81+
},
82+
83+
/// All connection attempts failed.
84+
Failed {
85+
connection_id: ConnectionId,
86+
errors: Vec<(Multiaddr, DialError)>,
87+
},
88+
89+
/// Future was canceled.
90+
Canceled { connection_id: ConnectionId },
91+
}
92+
7393
/// TCP transport.
7494
pub(crate) struct TcpTransport {
7595
/// Transport context.
@@ -96,22 +116,16 @@ pub(crate) struct TcpTransport {
96116
>,
97117

98118
/// Pending raw, unnegotiated connections.
99-
pending_raw_connections: FuturesUnordered<
100-
BoxFuture<
101-
'static,
102-
Result<
103-
(ConnectionId, Multiaddr, TcpStream),
104-
(ConnectionId, Vec<(Multiaddr, DialError)>),
105-
>,
106-
>,
107-
>,
119+
pending_raw_connections: FuturesUnordered<BoxFuture<'static, RawConnectionResult>>,
108120

109121
/// Opened raw connection, waiting for approval/rejection from `TransportManager`.
110122
opened_raw: HashMap<ConnectionId, (TcpStream, Multiaddr)>,
111123

112124
/// Canceled raw connections.
113125
canceled: HashSet<ConnectionId>,
114126

127+
cancel_futures: HashMap<ConnectionId, AbortHandle>,
128+
115129
/// Connections which have been opened and negotiated but are being validated by the
116130
/// `TransportManager`.
117131
pending_open: HashMap<ConnectionId, NegotiatedConnection>,
@@ -284,6 +298,7 @@ impl TransportBuilder for TcpTransport {
284298
pending_inbound_connections: HashMap::new(),
285299
pending_connections: FuturesUnordered::new(),
286300
pending_raw_connections: FuturesUnordered::new(),
301+
cancel_futures: HashMap::new(),
287302
},
288303
listen_addresses,
289304
))
@@ -412,11 +427,17 @@ impl Transport for TcpTransport {
412427
})
413428
.collect();
414429

415-
self.pending_raw_connections.push(Box::pin(async move {
430+
// Future that will resolve to the first successful connection.
431+
let future = async move {
416432
let mut errors = Vec::with_capacity(num_addresses);
417433
while let Some(result) = futures.next().await {
418434
match result {
419-
Ok((address, stream)) => return Ok((connection_id, address, stream)),
435+
Ok((address, stream)) =>
436+
return RawConnectionResult::Connected {
437+
connection_id,
438+
address,
439+
stream,
440+
},
420441
Err(error) => {
421442
tracing::debug!(
422443
target: LOG_TARGET,
@@ -429,8 +450,16 @@ impl Transport for TcpTransport {
429450
}
430451
}
431452

432-
Err((connection_id, errors))
433-
}));
453+
RawConnectionResult::Failed {
454+
connection_id,
455+
errors,
456+
}
457+
};
458+
459+
let (fut, handle) = futures::future::abortable(future);
460+
let fut = fut.unwrap_or_else(move |_| RawConnectionResult::Canceled { connection_id });
461+
self.pending_raw_connections.push(Box::pin(fut));
462+
self.cancel_futures.insert(connection_id, handle);
434463

435464
Ok(())
436465
}
@@ -488,6 +517,7 @@ impl Transport for TcpTransport {
488517

489518
fn cancel(&mut self, connection_id: ConnectionId) {
490519
self.canceled.insert(connection_id);
520+
self.cancel_futures.remove(&connection_id).map(|handle| handle.abort());
491521
}
492522
}
493523

@@ -523,32 +553,35 @@ impl Stream for TcpTransport {
523553
}
524554

525555
while let Poll::Ready(Some(result)) = self.pending_raw_connections.poll_next_unpin(cx) {
526-
match result {
527-
Ok((connection_id, address, stream)) => {
528-
tracing::trace!(
529-
target: LOG_TARGET,
530-
?connection_id,
531-
?address,
532-
canceled = self.canceled.contains(&connection_id),
533-
"connection opened",
534-
);
556+
tracing::trace!(target: LOG_TARGET, ?result, "raw connection result");
535557

558+
match result {
559+
RawConnectionResult::Connected {
560+
connection_id,
561+
address,
562+
stream,
563+
} =>
536564
if !self.canceled.remove(&connection_id) {
537565
self.opened_raw.insert(connection_id, (stream, address.clone()));
538566

539567
return Poll::Ready(Some(TransportEvent::ConnectionOpened {
540568
connection_id,
541569
address,
542570
}));
543-
}
544-
}
545-
Err((connection_id, errors)) =>
571+
},
572+
RawConnectionResult::Failed {
573+
connection_id,
574+
errors,
575+
} =>
546576
if !self.canceled.remove(&connection_id) {
547577
return Poll::Ready(Some(TransportEvent::OpenFailure {
548578
connection_id,
549579
errors,
550580
}));
551581
},
582+
RawConnectionResult::Canceled { connection_id } => {
583+
self.canceled.remove(&connection_id);
584+
}
552585
}
553586
}
554587

0 commit comments

Comments
 (0)