diff --git a/src/configure.rs b/src/configure.rs index c8f8a062e0..34c2cee70b 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -576,7 +576,7 @@ async fn get_autoconfig( async fn nicer_configuration_error(context: &Context, e: String) -> String { if e.to_lowercase().contains("could not resolve") - || e.to_lowercase().contains("no dns resolution results") + || e.to_lowercase().contains("connection attempts") || e.to_lowercase() .contains("temporary failure in name resolution") || e.to_lowercase().contains("name or service not known") diff --git a/src/imap/client.rs b/src/imap/client.rs index 74d64c166e..b6b6ffeb37 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; -use anyhow::{format_err, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; @@ -14,7 +14,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, +}; use crate::socks::Socks5Config; use crate::tools::time; @@ -106,6 +108,53 @@ impl Client { Ok(Session::new(session, capabilities)) } + async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, + ) -> Result { + let context = &context; + let host = &host; + info!( + context, + "Attempting IMAP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => { + Client::connect_secure(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Starttls => { + Client::connect_starttls(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, + }; + match res { + Ok(client) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "imap", host, port, &ip_addr, time()).await?; + Ok(client) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } + } + pub async fn connect( context: &Context, socks5_config: Option, @@ -131,40 +180,21 @@ impl Client { }; Ok(client) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in - lookup_host_with_cache(context, host, port, "imap", load_cache).await? - { - let res = match security { - ConnectionSecurity::Tls => { - Client::connect_secure(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Starttls => { - Client::connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, - }; - match res { - Ok(client) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; - } - update_connection_history(context, "imap", host, port, &ip_addr, time()) - .await?; - return Ok(client); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); - } - } - } - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + + let connection_futures = + lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + Self::connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_connection_attempts(connection_futures).await } } diff --git a/src/net.rs b/src/net.rs index 38ef5cf121..132cbff906 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,4 +1,5 @@ //! # Common network utilities. +use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; @@ -6,6 +7,7 @@ use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_native_tls::TlsStream; use tokio::net::TcpStream; +use tokio::task::JoinSet; use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; @@ -130,6 +132,84 @@ pub(crate) async fn connect_tls_inner( Ok(tls_stream) } +/// Runs connection attempt futures. +/// +/// Accepts iterator of connection attempt futures +/// and runs them until one of them succeeds +/// or all of them fail. +/// +/// If all connection attempts fail, returns the first error. +/// +/// This functions starts with one connection attempt and maintains +/// up to five parallel connection attempts if connecting takes time. +pub(crate) async fn run_connection_attempts(mut futures: I) -> Result +where + I: Iterator, + F: Future> + Send + 'static, + O: Send + 'static, +{ + let mut connection_attempt_set = JoinSet::new(); + + // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. + // This way we can have up to 5 parallel connection attempts at the same time. + let mut delays = [ + Duration::from_millis(300), + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(10), + ] + .into_iter(); + + let mut first_error = None; + + let res = loop { + if let Some(fut) = futures.next() { + connection_attempt_set.spawn(fut); + } + + let one_year = Duration::from_secs(60 * 60 * 24 * 365); + let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here + let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { + // The delay for starting the next connection attempt has expired. + // `continue` the loop to push the next connection into connection_attempt_set. + continue; + }; + + match res { + Some(res) => { + match res.context("Failed to join task") { + Ok(Ok(conn)) => { + // Successfully connected. + break Ok(conn); + } + Ok(Err(err)) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + Err(err) => { + break Err(err); + } + } + } + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break Err( + first_error.unwrap_or_else(|| format_err!("No connection attempts were made")) + ); + } + } + }; + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + connection_attempt_set.shutdown().await; + + res +} + /// If `load_cache` is true, may use cached DNS results. /// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests, /// this option should only be used when connection is authenticated, @@ -142,22 +222,9 @@ pub(crate) async fn connect_tcp( port: u16, load_cache: bool, ) -> Result>>> { - let mut first_error = None; - - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? { - match connect_tcp_inner(resolved_addr).await { - Ok(stream) => { - return Ok(stream); - } - Err(err) => { - warn!( - context, - "Failed to connect to {}: {:#}.", resolved_addr, err - ); - first_error.get_or_insert(err); - } - } - } - - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .map(connect_tcp_inner); + run_connection_attempts(connection_futures).await } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 80913abe6c..29169a6c63 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; -use anyhow::{bail, format_err, Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; @@ -11,7 +11,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, +}; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; use crate::tools::time; @@ -72,6 +74,49 @@ pub(crate) async fn connect_and_auth( Ok(transport) } +async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, +) -> Result> { + let context = &context; + let host = &host; + info!( + context, + "Attempting SMTP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, + }; + match res { + Ok(stream) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; + Ok(stream) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } +} + /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// @@ -106,38 +151,20 @@ async fn connect_stream( }; Ok(stream) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await? - { - let res = match security { - ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, - ConnectionSecurity::Starttls => { - connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, - }; - match res { - Ok(stream) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; - } - update_connection_history(context, "smtp", host, port, &ip_addr, time()) - .await?; - return Ok(stream); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); - } - } - } - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_connection_attempts(connection_futures).await } }