Skip to content

Commit

Permalink
parallelize connect_tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 27, 2024
1 parent ce32ded commit e7fbeb7
Showing 1 changed file with 68 additions and 11 deletions.
79 changes: 68 additions & 11 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_native_tls::TlsStream;
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_io_timeout::TimeoutStream;
use tokio::task::JoinSet;

use crate::context::Context;
use crate::sql::Sql;
Expand Down Expand Up @@ -151,22 +152,78 @@ pub(crate) async fn connect_tcp(
port: u16,
load_cache: bool,
) -> Result<Pin<Box<TimeoutStream<TcpStream>>>> {
let mut connection_attempt_set = JoinSet::new();
let mut delay_set = JoinSet::new();

let mut connection_futures = Vec::new();
for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache)
.await?
.into_iter()
.rev()
{
let fut = connect_tcp_inner(resolved_addr);
connection_futures.push(fut);
}

// Start with one connection.
if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}

for delay in CONNECTION_DELAYS {
delay_set.spawn(tokio::time::sleep(delay));
}

let mut first_error = None;

for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? {
match connect_tcp_inner(resolved_addr).await {
Ok(stream) => {
return Ok(stream);
}
Err(err) => {
warn!(
context,
"Failed to connect to {}: {:#}.", resolved_addr, err
);
first_error.get_or_insert(err);
loop {
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task")? {
Ok(conn) => {
// Successfully connected.
return Ok(conn);
},
Err(err) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
}
},
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break;
}
}
},
_ = delay_set.join_next(), if !delay_set.is_empty() => {
// Delay expired.
//
// Don't do anything other than adding
// another connection attempt to the active set.
}
}

if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}
}

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
}

0 comments on commit e7fbeb7

Please sign in to comment.