From 31d6e54a994ea78a49110bf181862ea372351de2 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 25 Aug 2024 07:45:40 +0000 Subject: [PATCH] feat: parallelize IMAP and SMTP connection attempts --- src/imap/client.rs | 151 +++++++++++++++++++++++++++++++++++++------- src/smtp/connect.rs | 144 ++++++++++++++++++++++++++++++++++++------ 2 files changed, 250 insertions(+), 45 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index 74d64c166e..ea7e7ad958 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -1,11 +1,13 @@ use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; +use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; use tokio::io::BufWriter; +use tokio::task::JoinSet; use super::capabilities::Capabilities; use super::session::Session; @@ -106,6 +108,53 @@ impl Client { Ok(Session::new(session, capabilities)) } + async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, + ) -> Result { + let context = &context; + let host = &host; + info!( + context, + "Attempting IMAP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => { + Client::connect_secure(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Starttls => { + Client::connect_starttls(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, + }; + match res { + Ok(client) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "imap", host, port, &ip_addr, time()).await?; + Ok(client) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } + } + pub async fn connect( context: &Context, socks5_config: Option, @@ -131,39 +180,93 @@ impl Client { }; Ok(client) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in - lookup_host_with_cache(context, host, port, "imap", load_cache).await? + + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => { - Client::connect_secure(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Starttls => { - Client::connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, - }; - match res { - Ok(client) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = Box::pin(Self::connection_attempt( + context, + host, + security, + resolved_addr, + strict_tls, + )); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + // Start second connection attempt 300 ms after the first. + delay_set.spawn(tokio::time::sleep(Duration::from_millis(300))); + + // Start third connection attempt if we have not managed to connect in 10 seconds. + delay_set.spawn(tokio::time::sleep(Duration::from_secs(10))); + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "imap", host, port, &ip_addr, time()) - .await?; - return Ok(client); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 80913abe6c..2fd93018a0 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -1,10 +1,12 @@ //! SMTP connection establishment. use std::net::SocketAddr; +use std::time::Duration; use anyhow::{bail, format_err, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; +use tokio::task::JoinSet; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; @@ -72,6 +74,49 @@ pub(crate) async fn connect_and_auth( Ok(transport) } +async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, +) -> Result> { + let context = &context; + let host = &host; + info!( + context, + "Attempting SMTP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, + }; + match res { + Ok(stream) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; + Ok(stream) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } +} + /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// @@ -106,37 +151,94 @@ async fn connect_stream( }; Ok(stream) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await? + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + + for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, - ConnectionSecurity::Starttls => { - connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, - }; - match res { - Ok(stream) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = Box::pin(connection_attempt( + context, + host, + security, + resolved_addr, + strict_tls, + )); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + // Start second connection attempt 300 ms after the first. + delay_set.spawn(tokio::time::sleep(Duration::from_millis(300))); + + // Start third connection attempt if we have not managed to connect in 10 seconds. + delay_set.spawn(tokio::time::sleep(Duration::from_secs(10))); + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "smtp", host, port, &ip_addr, time()) - .await?; - return Ok(stream); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } }