From aab8ef2726fdbd474159e4d509a263d0e08c37da Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 22:00:07 +0000 Subject: [PATCH] feat: parallelize IMAP and SMTP connection attempts (#5915) Previously for each connection candidate (essentially host and port pair) after resolving the host to a list of IPs Delta Chat iterated IP addresses one by one. Now for IMAP and SMTP we try up to 5 IP addresses in parallel. We start with one connection and add more connections later. If some connection fails, e.g. we try to connect to IPv6 on IPv4 network and get "Network is unreachable" (ENETUNREACH) error, we replace failed connection with another one immediately. Co-authored-by: Hocuri --- src/configure.rs | 2 +- src/imap/client.rs | 94 ++++++++++++++++++++++++++-------------- src/net.rs | 103 ++++++++++++++++++++++++++++++++++++-------- src/smtp/connect.rs | 85 +++++++++++++++++++++++------------- 4 files changed, 204 insertions(+), 80 deletions(-) diff --git a/src/configure.rs b/src/configure.rs index c8f8a062e0..34c2cee70b 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -576,7 +576,7 @@ async fn get_autoconfig( async fn nicer_configuration_error(context: &Context, e: String) -> String { if e.to_lowercase().contains("could not resolve") - || e.to_lowercase().contains("no dns resolution results") + || e.to_lowercase().contains("connection attempts") || e.to_lowercase() .contains("temporary failure in name resolution") || e.to_lowercase().contains("name or service not known") diff --git a/src/imap/client.rs b/src/imap/client.rs index 74d64c166e..b6b6ffeb37 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; -use anyhow::{format_err, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; @@ -14,7 +14,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, +}; use crate::socks::Socks5Config; use crate::tools::time; @@ -106,6 +108,53 @@ impl Client { Ok(Session::new(session, capabilities)) } + async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, + ) -> Result { + let context = &context; + let host = &host; + info!( + context, + "Attempting IMAP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => { + Client::connect_secure(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Starttls => { + Client::connect_starttls(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, + }; + match res { + Ok(client) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "imap", host, port, &ip_addr, time()).await?; + Ok(client) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } + } + pub async fn connect( context: &Context, socks5_config: Option, @@ -131,40 +180,21 @@ impl Client { }; Ok(client) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in - lookup_host_with_cache(context, host, port, "imap", load_cache).await? - { - let res = match security { - ConnectionSecurity::Tls => { - Client::connect_secure(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Starttls => { - Client::connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, - }; - match res { - Ok(client) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; - } - update_connection_history(context, "imap", host, port, &ip_addr, time()) - .await?; - return Ok(client); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); - } - } - } - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + + let connection_futures = + lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + Self::connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_connection_attempts(connection_futures).await } } diff --git a/src/net.rs b/src/net.rs index 38ef5cf121..132cbff906 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,4 +1,5 @@ //! # Common network utilities. +use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; @@ -6,6 +7,7 @@ use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_native_tls::TlsStream; use tokio::net::TcpStream; +use tokio::task::JoinSet; use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; @@ -130,6 +132,84 @@ pub(crate) async fn connect_tls_inner( Ok(tls_stream) } +/// Runs connection attempt futures. +/// +/// Accepts iterator of connection attempt futures +/// and runs them until one of them succeeds +/// or all of them fail. +/// +/// If all connection attempts fail, returns the first error. +/// +/// This functions starts with one connection attempt and maintains +/// up to five parallel connection attempts if connecting takes time. +pub(crate) async fn run_connection_attempts(mut futures: I) -> Result +where + I: Iterator, + F: Future> + Send + 'static, + O: Send + 'static, +{ + let mut connection_attempt_set = JoinSet::new(); + + // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. + // This way we can have up to 5 parallel connection attempts at the same time. + let mut delays = [ + Duration::from_millis(300), + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(10), + ] + .into_iter(); + + let mut first_error = None; + + let res = loop { + if let Some(fut) = futures.next() { + connection_attempt_set.spawn(fut); + } + + let one_year = Duration::from_secs(60 * 60 * 24 * 365); + let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here + let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { + // The delay for starting the next connection attempt has expired. + // `continue` the loop to push the next connection into connection_attempt_set. + continue; + }; + + match res { + Some(res) => { + match res.context("Failed to join task") { + Ok(Ok(conn)) => { + // Successfully connected. + break Ok(conn); + } + Ok(Err(err)) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + Err(err) => { + break Err(err); + } + } + } + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break Err( + first_error.unwrap_or_else(|| format_err!("No connection attempts were made")) + ); + } + } + }; + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + connection_attempt_set.shutdown().await; + + res +} + /// If `load_cache` is true, may use cached DNS results. /// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests, /// this option should only be used when connection is authenticated, @@ -142,22 +222,9 @@ pub(crate) async fn connect_tcp( port: u16, load_cache: bool, ) -> Result>>> { - let mut first_error = None; - - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? { - match connect_tcp_inner(resolved_addr).await { - Ok(stream) => { - return Ok(stream); - } - Err(err) => { - warn!( - context, - "Failed to connect to {}: {:#}.", resolved_addr, err - ); - first_error.get_or_insert(err); - } - } - } - - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .map(connect_tcp_inner); + run_connection_attempts(connection_futures).await } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 80913abe6c..29169a6c63 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; -use anyhow::{bail, format_err, Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; @@ -11,7 +11,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, +}; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; use crate::tools::time; @@ -72,6 +74,49 @@ pub(crate) async fn connect_and_auth( Ok(transport) } +async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, +) -> Result> { + let context = &context; + let host = &host; + info!( + context, + "Attempting SMTP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, + }; + match res { + Ok(stream) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; + Ok(stream) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } +} + /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// @@ -106,38 +151,20 @@ async fn connect_stream( }; Ok(stream) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await? - { - let res = match security { - ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, - ConnectionSecurity::Starttls => { - connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, - }; - match res { - Ok(stream) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; - } - update_connection_history(context, "smtp", host, port, &ip_addr, time()) - .await?; - return Ok(stream); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); - } - } - } - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_connection_attempts(connection_futures).await } }