diff --git a/src/net.rs b/src/net.rs index dad174a2eb..aeb0b4867d 100644 --- a/src/net.rs +++ b/src/net.rs @@ -8,6 +8,7 @@ use async_native_tls::TlsStream; use tokio::net::TcpStream; use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; +use tokio::task::JoinSet; use crate::context::Context; use crate::sql::Sql; @@ -151,22 +152,78 @@ pub(crate) async fn connect_tcp( port: u16, load_cache: bool, ) -> Result>>> { + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .rev() + { + let fut = connect_tcp_inner(resolved_addr); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + let mut first_error = None; - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? { - match connect_tcp_inner(resolved_addr).await { - Ok(stream) => { - return Ok(stream); - } - Err(err) => { - warn!( - context, - "Failed to connect to {}: {:#}.", resolved_addr, err - ); - first_error.get_or_insert(err); + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } + } + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) }