From 42dc9621c047ebbe19c222b8dd3071540c89280c Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 25 Aug 2024 07:45:40 +0000 Subject: [PATCH] feat: parallelize IMAP and SMTP connection attempts --- src/imap/client.rs | 147 ++++++++++++++++++++++++++++++++++++-------- src/net.rs | 88 ++++++++++++++++++++++---- src/smtp/connect.rs | 139 ++++++++++++++++++++++++++++++++++------- 3 files changed, 316 insertions(+), 58 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index 74d64c166e..7c334912fd 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -6,6 +6,7 @@ use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; use tokio::io::BufWriter; +use tokio::task::JoinSet; use super::capabilities::Capabilities; use super::session::Session; @@ -14,7 +15,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, +}; use crate::socks::Socks5Config; use crate::tools::time; @@ -106,6 +109,53 @@ impl Client { Ok(Session::new(session, capabilities)) } + async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, + ) -> Result { + let context = &context; + let host = &host; + info!( + context, + "Attempting IMAP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => { + Client::connect_secure(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Starttls => { + Client::connect_starttls(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, + }; + match res { + Ok(client) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "imap", host, port, &ip_addr, time()).await?; + Ok(client) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } + } + pub async fn connect( context: &Context, socks5_config: Option, @@ -131,39 +181,86 @@ impl Client { }; Ok(client) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in - lookup_host_with_cache(context, host, port, "imap", load_cache).await? + + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => { - Client::connect_secure(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Starttls => { - Client::connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, - }; - match res { - Ok(client) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = + Self::connection_attempt(context, host, security, resolved_addr, strict_tls); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "imap", host, port, &ip_addr, time()) - .await?; - return Ok(client); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } } diff --git a/src/net.rs b/src/net.rs index 38ef5cf121..9c41ce1c93 100644 --- a/src/net.rs +++ b/src/net.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_native_tls::TlsStream; use tokio::net::TcpStream; +use tokio::task::JoinSet; use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; @@ -37,6 +38,15 @@ pub(crate) const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(300); /// TTL for caches in seconds. pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60; +/// Start additional connection attempts after 300 ms, 5 s, 10 s and 15 s. +/// This way we can have up to 5 parallel connection attempts at the same time. +pub(crate) const CONNECTION_DELAYS: [Duration; 4] = [ + Duration::from_millis(300), + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(10), +]; + /// Removes connection history entries after `CACHE_TTL`. pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> { let now = time(); @@ -142,22 +152,78 @@ pub(crate) async fn connect_tcp( port: u16, load_cache: bool, ) -> Result>>> { + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .rev() + { + let fut = connect_tcp_inner(resolved_addr); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + let mut first_error = None; - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? { - match connect_tcp_inner(resolved_addr).await { - Ok(stream) => { - return Ok(stream); - } - Err(err) => { - warn!( - context, - "Failed to connect to {}: {:#}.", resolved_addr, err - ); - first_error.get_or_insert(err); + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } + } + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 80913abe6c..a9c6378a5e 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -5,13 +5,16 @@ use std::net::SocketAddr; use anyhow::{bail, format_err, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; +use tokio::task::JoinSet; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, +}; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; use crate::tools::time; @@ -72,6 +75,49 @@ pub(crate) async fn connect_and_auth( Ok(transport) } +async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, +) -> Result> { + let context = &context; + let host = &host; + info!( + context, + "Attempting SMTP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, + }; + match res { + Ok(stream) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; + Ok(stream) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } +} + /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// @@ -106,37 +152,86 @@ async fn connect_stream( }; Ok(stream) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await? + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + + for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, - ConnectionSecurity::Starttls => { - connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, - }; - match res { - Ok(stream) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = connection_attempt(context, host, security, resolved_addr, strict_tls); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "smtp", host, port, &ip_addr, time()) - .await?; - return Ok(stream); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } }