From 42dc9621c047ebbe19c222b8dd3071540c89280c Mon Sep 17 00:00:00 2001 From: link2xt Date: Sun, 25 Aug 2024 07:45:40 +0000 Subject: [PATCH 01/11] feat: parallelize IMAP and SMTP connection attempts --- src/imap/client.rs | 147 ++++++++++++++++++++++++++++++++++++-------- src/net.rs | 88 ++++++++++++++++++++++---- src/smtp/connect.rs | 139 ++++++++++++++++++++++++++++++++++------- 3 files changed, 316 insertions(+), 58 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index 74d64c166e..7c334912fd 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -6,6 +6,7 @@ use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; use tokio::io::BufWriter; +use tokio::task::JoinSet; use super::capabilities::Capabilities; use super::session::Session; @@ -14,7 +15,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, +}; use crate::socks::Socks5Config; use crate::tools::time; @@ -106,6 +109,53 @@ impl Client { Ok(Session::new(session, capabilities)) } + async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, + ) -> Result { + let context = &context; + let host = &host; + info!( + context, + "Attempting IMAP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => { + Client::connect_secure(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Starttls => { + Client::connect_starttls(resolved_addr, host, strict_tls).await + } + ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, + }; + match res { + Ok(client) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "imap", host, port, &ip_addr, time()).await?; + Ok(client) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } + } + pub async fn connect( context: &Context, socks5_config: Option, @@ -131,39 +181,86 @@ impl Client { }; Ok(client) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in - lookup_host_with_cache(context, host, port, "imap", load_cache).await? + + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => { - Client::connect_secure(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Starttls => { - Client::connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await, - }; - match res { - Ok(client) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = + Self::connection_attempt(context, host, security, resolved_addr, strict_tls); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "imap", host, port, &ip_addr, time()) - .await?; - return Ok(client); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } } diff --git a/src/net.rs b/src/net.rs index 38ef5cf121..9c41ce1c93 100644 --- a/src/net.rs +++ b/src/net.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_native_tls::TlsStream; use tokio::net::TcpStream; +use tokio::task::JoinSet; use tokio::time::timeout; use tokio_io_timeout::TimeoutStream; @@ -37,6 +38,15 @@ pub(crate) const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(300); /// TTL for caches in seconds. pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60; +/// Start additional connection attempts after 300 ms, 5 s, 10 s and 15 s. +/// This way we can have up to 5 parallel connection attempts at the same time. +pub(crate) const CONNECTION_DELAYS: [Duration; 4] = [ + Duration::from_millis(300), + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(10), +]; + /// Removes connection history entries after `CACHE_TTL`. pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> { let now = time(); @@ -142,22 +152,78 @@ pub(crate) async fn connect_tcp( port: u16, load_cache: bool, ) -> Result>>> { + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .rev() + { + let fut = connect_tcp_inner(resolved_addr); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + let mut first_error = None; - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache).await? { - match connect_tcp_inner(resolved_addr).await { - Ok(stream) => { - return Ok(stream); - } - Err(err) => { - warn!( - context, - "Failed to connect to {}: {:#}.", resolved_addr, err - ); - first_error.get_or_insert(err); + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } + } + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 80913abe6c..a9c6378a5e 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -5,13 +5,16 @@ use std::net::SocketAddr; use anyhow::{bail, format_err, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; +use tokio::task::JoinSet; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; -use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history}; +use crate::net::{ + connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, +}; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; use crate::tools::time; @@ -72,6 +75,49 @@ pub(crate) async fn connect_and_auth( Ok(transport) } +async fn connection_attempt( + context: Context, + host: String, + security: ConnectionSecurity, + resolved_addr: SocketAddr, + strict_tls: bool, +) -> Result> { + let context = &context; + let host = &host; + info!( + context, + "Attempting SMTP connection to {host} ({resolved_addr})." + ); + let res = match security { + ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await, + ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, + }; + match res { + Ok(stream) => { + let ip_addr = resolved_addr.ip().to_string(); + let port = resolved_addr.port(); + + let save_cache = match security { + ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, + ConnectionSecurity::Plain => false, + }; + if save_cache { + update_connect_timestamp(context, host, &ip_addr).await?; + } + update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?; + Ok(stream) + } + Err(err) => { + warn!( + context, + "Failed to connect to {host} ({resolved_addr}): {err:#}." + ); + Err(err) + } + } +} + /// Returns TLS, STARTTLS or plaintext connection /// using SOCKS5 or direct connection depending on the given configuration. /// @@ -106,37 +152,86 @@ async fn connect_stream( }; Ok(stream) } else { - let mut first_error = None; let load_cache = match security { ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls, ConnectionSecurity::Plain => false, }; - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await? + let mut connection_attempt_set = JoinSet::new(); + let mut delay_set = JoinSet::new(); + + let mut connection_futures = Vec::new(); + + for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .rev() { - let res = match security { - ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await, - ConnectionSecurity::Starttls => { - connect_starttls(resolved_addr, host, strict_tls).await - } - ConnectionSecurity::Plain => connect_insecure(resolved_addr).await, - }; - match res { - Ok(stream) => { - let ip_addr = resolved_addr.ip().to_string(); - if load_cache { - update_connect_timestamp(context, host, &ip_addr).await?; + let context = context.clone(); + let host = host.to_string(); + let fut = connection_attempt(context, host, security, resolved_addr, strict_tls); + connection_futures.push(fut); + } + + // Start with one connection. + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + for delay in CONNECTION_DELAYS { + delay_set.spawn(tokio::time::sleep(delay)); + } + + let mut first_error = None; + loop { + tokio::select! { + biased; + + res = connection_attempt_set.join_next() => { + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + }, + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); + } + } + }, + None => { + // Out of connection attempts. + // + // Break out of the loop and return error. + break; + } } - update_connection_history(context, "smtp", host, port, &ip_addr, time()) - .await?; - return Ok(stream); - } - Err(err) => { - warn!(context, "Failed to connect to {resolved_addr}: {err:#}."); - first_error.get_or_insert(err); + }, + _ = delay_set.join_next(), if !delay_set.is_empty() => { + // Delay expired. + // + // Don't do anything other than adding + // another connection attempt to the active set. } } + + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } } + + // Abort remaining connection attempts and free resources + // such as OS sockets and `Context` references + // held by connection attempt tasks. + // + // `delay_set` contains just `sleep` tasks + // so no need to await futures there, + // it is enough that futures are aborted + // when the set is dropped. + connection_attempt_set.shutdown().await; + Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } } From 9cac48deeddd3c5c1b46e109e232e631c883fcaa Mon Sep 17 00:00:00 2001 From: Hocuri Date: Wed, 28 Aug 2024 21:00:03 +0200 Subject: [PATCH 02/11] Simplifications: Use timeout() instead of select! and delay_set --- src/imap/client.rs | 77 ++++++++++++++++++--------------------------- src/net.rs | 74 +++++++++++++++++-------------------------- src/smtp/connect.rs | 76 ++++++++++++++++++-------------------------- 3 files changed, 89 insertions(+), 138 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index 7c334912fd..f9a0fd3bdb 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -1,5 +1,6 @@ use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; +use std::time::Duration; use anyhow::{format_err, Context as _, Result}; use async_imap::Client as ImapClient; @@ -7,6 +8,7 @@ use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; use tokio::io::BufWriter; use tokio::task::JoinSet; +use tokio::time::timeout; use super::capabilities::Capabilities; use super::session::Session; @@ -187,7 +189,6 @@ impl Client { }; let mut connection_attempt_set = JoinSet::new(); - let mut delay_set = JoinSet::new(); let mut connection_futures = Vec::new(); for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache) @@ -202,63 +203,47 @@ impl Client { connection_futures.push(fut); } - // Start with one connection. - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } - - for delay in CONNECTION_DELAYS { - delay_set.spawn(tokio::time::sleep(delay)); - } - + let mut delays = CONNECTION_DELAYS.into_iter(); let mut first_error = None; + loop { - tokio::select! { - biased; - - res = connection_attempt_set.join_next() => { - match res { - Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { - // Successfully connected. - return Ok(conn); - }, - Err(err) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - } - }, - None => { - // Out of connection attempts. - // - // Break out of the loop and return error. - break; + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + let one_year = Duration::from_secs(60 * 60 * 24 * 365); + let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here + let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { + // The delay for starting the next connection attempt has expired. + // `continue` the loop to push the next connection into connection_attempt_set. + continue; + }; + + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + } + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); } } - }, - _ = delay_set.join_next(), if !delay_set.is_empty() => { - // Delay expired. + } + None => { + // Out of connection attempts. // - // Don't do anything other than adding - // another connection attempt to the active set. + // Break out of the loop and return error. + break; } } - - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } } // Abort remaining connection attempts and free resources // such as OS sockets and `Context` references // held by connection attempt tasks. - // - // `delay_set` contains just `sleep` tasks - // so no need to await futures there, - // it is enough that futures are aborted - // when the set is dropped. connection_attempt_set.shutdown().await; Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) diff --git a/src/net.rs b/src/net.rs index 9c41ce1c93..b7911545ff 100644 --- a/src/net.rs +++ b/src/net.rs @@ -153,7 +153,6 @@ pub(crate) async fn connect_tcp( load_cache: bool, ) -> Result>>> { let mut connection_attempt_set = JoinSet::new(); - let mut delay_set = JoinSet::new(); let mut connection_futures = Vec::new(); for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache) @@ -165,64 +164,47 @@ pub(crate) async fn connect_tcp( connection_futures.push(fut); } - // Start with one connection. - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } - - for delay in CONNECTION_DELAYS { - delay_set.spawn(tokio::time::sleep(delay)); - } - + let mut delays = CONNECTION_DELAYS.into_iter(); let mut first_error = None; loop { - tokio::select! { - biased; - - res = connection_attempt_set.join_next() => { - match res { - Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { - // Successfully connected. - return Ok(conn); - }, - Err(err) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - } - }, - None => { - // Out of connection attempts. - // - // Break out of the loop and return error. - break; + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } + + let one_year = Duration::from_secs(60 * 60 * 24 * 365); + let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here + let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { + // The delay for starting the next connection attempt has expired. + // `continue` the loop to push the next connection into connection_attempt_set. + continue; + }; + + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + } + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); } } - }, - _ = delay_set.join_next(), if !delay_set.is_empty() => { - // Delay expired. + } + None => { + // Out of connection attempts. // - // Don't do anything other than adding - // another connection attempt to the active set. + // Break out of the loop and return error. + break; } } - - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } } // Abort remaining connection attempts and free resources // such as OS sockets and `Context` references // held by connection attempt tasks. - // - // `delay_set` contains just `sleep` tasks - // so no need to await futures there, - // it is enough that futures are aborted - // when the set is dropped. connection_attempt_set.shutdown().await; Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index a9c6378a5e..006dde25d1 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -1,11 +1,13 @@ //! SMTP connection establishment. use std::net::SocketAddr; +use std::time::Duration; use anyhow::{bail, format_err, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; use tokio::task::JoinSet; +use tokio::time::timeout; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; @@ -158,7 +160,6 @@ async fn connect_stream( }; let mut connection_attempt_set = JoinSet::new(); - let mut delay_set = JoinSet::new(); let mut connection_futures = Vec::new(); @@ -173,65 +174,48 @@ async fn connect_stream( connection_futures.push(fut); } - // Start with one connection. - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } - - for delay in CONNECTION_DELAYS { - delay_set.spawn(tokio::time::sleep(delay)); - } - + let mut delays = CONNECTION_DELAYS.into_iter(); let mut first_error = None; + loop { - tokio::select! { - biased; + if let Some(fut) = connection_futures.pop() { + connection_attempt_set.spawn(fut); + } - res = connection_attempt_set.join_next() => { - match res { - Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { - // Successfully connected. - return Ok(conn); - }, - Err(err) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - } - }, - None => { - // Out of connection attempts. - // - // Break out of the loop and return error. - break; + let one_year = Duration::from_secs(60 * 60 * 24 * 365); + let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here + let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { + // The delay for starting the next connection attempt has expired. + // `continue` the loop to push the next connection into connection_attempt_set. + continue; + }; + + match res { + Some(res) => { + match res.context("Failed to join task")? { + Ok(conn) => { + // Successfully connected. + return Ok(conn); + } + Err(err) => { + // Some connection attempt failed. + first_error.get_or_insert(err); } } - }, - _ = delay_set.join_next(), if !delay_set.is_empty() => { - // Delay expired. + } + None => { + // Out of connection attempts. // - // Don't do anything other than adding - // another connection attempt to the active set. + // Break out of the loop and return error. + break; } } - - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } } // Abort remaining connection attempts and free resources // such as OS sockets and `Context` references // held by connection attempt tasks. - // - // `delay_set` contains just `sleep` tasks - // so no need to await futures there, - // it is enough that futures are aborted - // when the set is dropped. connection_attempt_set.shutdown().await; - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) } } From fa3b5330fe15fc69129a50878f453905d8051e0c Mon Sep 17 00:00:00 2001 From: Hocuri Date: Wed, 28 Aug 2024 21:05:42 +0200 Subject: [PATCH 03/11] Simplify code by building an iterator instead of Vec --- src/smtp/connect.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 006dde25d1..475264db7c 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -161,24 +161,21 @@ async fn connect_stream( let mut connection_attempt_set = JoinSet::new(); - let mut connection_futures = Vec::new(); - - for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache) - .await? - .into_iter() - .rev() - { - let context = context.clone(); - let host = host.to_string(); - let fut = connection_attempt(context, host, security, resolved_addr, strict_tls); - connection_futures.push(fut); - } + let mut connection_futures = + lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + connection_attempt(context, host, security, resolved_addr, strict_tls) + }); let mut delays = CONNECTION_DELAYS.into_iter(); let mut first_error = None; loop { - if let Some(fut) = connection_futures.pop() { + if let Some(fut) = connection_futures.next() { connection_attempt_set.spawn(fut); } From 9dc1a725c1503a7a83d2bb7445332a9ccc899660 Mon Sep 17 00:00:00 2001 From: Hocuri Date: Wed, 28 Aug 2024 21:08:16 +0200 Subject: [PATCH 04/11] fix comment --- src/net.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.rs b/src/net.rs index b7911545ff..b1904e805b 100644 --- a/src/net.rs +++ b/src/net.rs @@ -38,7 +38,7 @@ pub(crate) const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(300); /// TTL for caches in seconds. pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60; -/// Start additional connection attempts after 300 ms, 5 s, 10 s and 15 s. +/// Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. /// This way we can have up to 5 parallel connection attempts at the same time. pub(crate) const CONNECTION_DELAYS: [Duration; 4] = [ Duration::from_millis(300), From a520c628d18538e9d58fc8a5d65721a6171ee0e9 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 20:22:33 +0000 Subject: [PATCH 05/11] Remove duplicate code --- src/imap/client.rs | 76 +++++++-------------------------------------- src/net.rs | 54 +++++++++++++++++--------------- src/smtp/connect.rs | 71 +++++++----------------------------------- 3 files changed, 52 insertions(+), 149 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index f9a0fd3bdb..e07ac52585 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -1,14 +1,11 @@ use std::net::SocketAddr; use std::ops::{Deref, DerefMut}; -use std::time::Duration; -use anyhow::{format_err, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; use async_imap::Session as ImapSession; use fast_socks5::client::Socks5Stream; use tokio::io::BufWriter; -use tokio::task::JoinSet; -use tokio::time::timeout; use super::capabilities::Capabilities; use super::session::Session; @@ -18,7 +15,7 @@ use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; use crate::net::{ - connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, + connect_tcp_inner, connect_tls_inner, run_futures_with_delays, update_connection_history, }; use crate::socks::Socks5Config; use crate::tools::time; @@ -188,65 +185,16 @@ impl Client { ConnectionSecurity::Plain => false, }; - let mut connection_attempt_set = JoinSet::new(); - - let mut connection_futures = Vec::new(); - for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache) - .await? - .into_iter() - .rev() - { - let context = context.clone(); - let host = host.to_string(); - let fut = - Self::connection_attempt(context, host, security, resolved_addr, strict_tls); - connection_futures.push(fut); - } - - let mut delays = CONNECTION_DELAYS.into_iter(); - let mut first_error = None; - - loop { - if let Some(fut) = connection_futures.pop() { - connection_attempt_set.spawn(fut); - } - - let one_year = Duration::from_secs(60 * 60 * 24 * 365); - let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here - let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { - // The delay for starting the next connection attempt has expired. - // `continue` the loop to push the next connection into connection_attempt_set. - continue; - }; - - match res { - Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { - // Successfully connected. - return Ok(conn); - } - Err(err) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - } - } - None => { - // Out of connection attempts. - // - // Break out of the loop and return error. - break; - } - } - } - - // Abort remaining connection attempts and free resources - // such as OS sockets and `Context` references - // held by connection attempt tasks. - connection_attempt_set.shutdown().await; - - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = + lookup_host_with_cache(context, host, port, "imap", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + Self::connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_futures_with_delays(connection_futures).await } } diff --git a/src/net.rs b/src/net.rs index b1904e805b..ed63038fd5 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,4 +1,5 @@ //! # Common network utilities. +use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::time::Duration; @@ -40,7 +41,7 @@ pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60; /// Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. /// This way we can have up to 5 parallel connection attempts at the same time. -pub(crate) const CONNECTION_DELAYS: [Duration; 4] = [ +const CONNECTION_DELAYS: [Duration; 4] = [ Duration::from_millis(300), Duration::from_secs(1), Duration::from_secs(5), @@ -140,35 +141,19 @@ pub(crate) async fn connect_tls_inner( Ok(tls_stream) } -/// If `load_cache` is true, may use cached DNS results. -/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests, -/// this option should only be used when connection is authenticated, -/// for example using TLS. -/// If TLS is not used or invalid TLS certificates are allowed, -/// this option should be disabled. -pub(crate) async fn connect_tcp( - context: &Context, - host: &str, - port: u16, - load_cache: bool, -) -> Result>>> { +pub(crate) async fn run_futures_with_delays(mut futures: I) -> Result +where + I: Iterator, + F: Future> + Send + 'static, + O: Send + 'static, +{ let mut connection_attempt_set = JoinSet::new(); - let mut connection_futures = Vec::new(); - for resolved_addr in lookup_host_with_cache(context, host, port, "", load_cache) - .await? - .into_iter() - .rev() - { - let fut = connect_tcp_inner(resolved_addr); - connection_futures.push(fut); - } - let mut delays = CONNECTION_DELAYS.into_iter(); let mut first_error = None; loop { - if let Some(fut) = connection_futures.pop() { + if let Some(fut) = futures.next() { connection_attempt_set.spawn(fut); } @@ -207,5 +192,24 @@ pub(crate) async fn connect_tcp( // held by connection attempt tasks. connection_attempt_set.shutdown().await; - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + Err(first_error.unwrap_or_else(|| format_err!("No DNS resolution results"))) +} + +/// If `load_cache` is true, may use cached DNS results. +/// Because the cache may be poisoned with incorrect results by networks hijacking DNS requests, +/// this option should only be used when connection is authenticated, +/// for example using TLS. +/// If TLS is not used or invalid TLS certificates are allowed, +/// this option should be disabled. +pub(crate) async fn connect_tcp( + context: &Context, + host: &str, + port: u16, + load_cache: bool, +) -> Result>>> { + let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache) + .await? + .into_iter() + .map(|resolved_addr| connect_tcp_inner(resolved_addr)); + run_futures_with_delays(connection_futures).await } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index 475264db7c..e0af589afa 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -1,13 +1,10 @@ //! SMTP connection establishment. use std::net::SocketAddr; -use std::time::Duration; -use anyhow::{bail, format_err, Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_smtp::{SmtpClient, SmtpTransport}; use tokio::io::BufStream; -use tokio::task::JoinSet; -use tokio::time::timeout; use crate::context::Context; use crate::login_param::{ConnectionCandidate, ConnectionSecurity}; @@ -15,7 +12,7 @@ use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; use crate::net::{ - connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS, + connect_tcp_inner, connect_tls_inner, run_futures_with_delays, update_connection_history, }; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; @@ -159,61 +156,15 @@ async fn connect_stream( ConnectionSecurity::Plain => false, }; - let mut connection_attempt_set = JoinSet::new(); - - let mut connection_futures = - lookup_host_with_cache(context, host, port, "smtp", load_cache) - .await? - .into_iter() - .map(|resolved_addr| { - let context = context.clone(); - let host = host.to_string(); - connection_attempt(context, host, security, resolved_addr, strict_tls) - }); - - let mut delays = CONNECTION_DELAYS.into_iter(); - let mut first_error = None; - - loop { - if let Some(fut) = connection_futures.next() { - connection_attempt_set.spawn(fut); - } - - let one_year = Duration::from_secs(60 * 60 * 24 * 365); - let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here - let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else { - // The delay for starting the next connection attempt has expired. - // `continue` the loop to push the next connection into connection_attempt_set. - continue; - }; - - match res { - Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { - // Successfully connected. - return Ok(conn); - } - Err(err) => { - // Some connection attempt failed. - first_error.get_or_insert(err); - } - } - } - None => { - // Out of connection attempts. - // - // Break out of the loop and return error. - break; - } - } - } - - // Abort remaining connection attempts and free resources - // such as OS sockets and `Context` references - // held by connection attempt tasks. - connection_attempt_set.shutdown().await; - Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}"))) + let connection_futures = lookup_host_with_cache(context, host, port, "smtp", load_cache) + .await? + .into_iter() + .map(|resolved_addr| { + let context = context.clone(); + let host = host.to_string(); + connection_attempt(context, host, security, resolved_addr, strict_tls) + }); + run_futures_with_delays(connection_futures).await } } From f2873fe62cc255ef23449aaa70974296d4029406 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 20:36:52 +0000 Subject: [PATCH 06/11] Document run_connection_attempts() --- src/imap/client.rs | 4 ++-- src/net.rs | 34 ++++++++++++++++++++++------------ src/smtp/connect.rs | 4 ++-- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/src/imap/client.rs b/src/imap/client.rs index e07ac52585..b6b6ffeb37 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -15,7 +15,7 @@ use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionStream; use crate::net::tls::wrap_tls; use crate::net::{ - connect_tcp_inner, connect_tls_inner, run_futures_with_delays, update_connection_history, + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, }; use crate::socks::Socks5Config; use crate::tools::time; @@ -194,7 +194,7 @@ impl Client { let host = host.to_string(); Self::connection_attempt(context, host, security, resolved_addr, strict_tls) }); - run_futures_with_delays(connection_futures).await + run_connection_attempts(connection_futures).await } } diff --git a/src/net.rs b/src/net.rs index ed63038fd5..9ac779c3ed 100644 --- a/src/net.rs +++ b/src/net.rs @@ -39,15 +39,6 @@ pub(crate) const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(300); /// TTL for caches in seconds. pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60; -/// Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. -/// This way we can have up to 5 parallel connection attempts at the same time. -const CONNECTION_DELAYS: [Duration; 4] = [ - Duration::from_millis(300), - Duration::from_secs(1), - Duration::from_secs(5), - Duration::from_secs(10), -]; - /// Removes connection history entries after `CACHE_TTL`. pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> { let now = time(); @@ -141,7 +132,17 @@ pub(crate) async fn connect_tls_inner( Ok(tls_stream) } -pub(crate) async fn run_futures_with_delays(mut futures: I) -> Result +/// Runs connection attempt futures. +/// +/// Accepts iterator of connection attempt futures +/// and runs them until one of them succeeds +/// or all of them fail. +/// +/// If all connection attempts fail, returns the first error. +/// +/// This functions starts with one connection attempt and maintains +/// up to five parallel connection attempts if connecting takes time. +pub(crate) async fn run_connection_attempts(mut futures: I) -> Result where I: Iterator, F: Future> + Send + 'static, @@ -149,7 +150,16 @@ where { let mut connection_attempt_set = JoinSet::new(); - let mut delays = CONNECTION_DELAYS.into_iter(); + // Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s. + // This way we can have up to 5 parallel connection attempts at the same time. + let mut delays = [ + Duration::from_millis(300), + Duration::from_secs(1), + Duration::from_secs(5), + Duration::from_secs(10), + ] + .into_iter(); + let mut first_error = None; loop { @@ -211,5 +221,5 @@ pub(crate) async fn connect_tcp( .await? .into_iter() .map(|resolved_addr| connect_tcp_inner(resolved_addr)); - run_futures_with_delays(connection_futures).await + run_connection_attempts(connection_futures).await } diff --git a/src/smtp/connect.rs b/src/smtp/connect.rs index e0af589afa..29169a6c63 100644 --- a/src/smtp/connect.rs +++ b/src/smtp/connect.rs @@ -12,7 +12,7 @@ use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp}; use crate::net::session::SessionBufStream; use crate::net::tls::wrap_tls; use crate::net::{ - connect_tcp_inner, connect_tls_inner, run_futures_with_delays, update_connection_history, + connect_tcp_inner, connect_tls_inner, run_connection_attempts, update_connection_history, }; use crate::oauth2::get_oauth2_access_token; use crate::socks::Socks5Config; @@ -164,7 +164,7 @@ async fn connect_stream( let host = host.to_string(); connection_attempt(context, host, security, resolved_addr, strict_tls) }); - run_futures_with_delays(connection_futures).await + run_connection_attempts(connection_futures).await } } From a7bae415ea81c174eadc3eb865b509e263da344d Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 20:38:51 +0000 Subject: [PATCH 07/11] more generic error message --- src/net.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.rs b/src/net.rs index 9ac779c3ed..3a120e7f7f 100644 --- a/src/net.rs +++ b/src/net.rs @@ -202,7 +202,7 @@ where // held by connection attempt tasks. connection_attempt_set.shutdown().await; - Err(first_error.unwrap_or_else(|| format_err!("No DNS resolution results"))) + Err(first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))) } /// If `load_cache` is true, may use cached DNS results. From 05757768a7ed04b7e6103dae4b88dd7d7ea3c5ba Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 20:45:08 +0000 Subject: [PATCH 08/11] Always abort remaining connection attempts --- src/net.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/net.rs b/src/net.rs index 3a120e7f7f..3ba9503fba 100644 --- a/src/net.rs +++ b/src/net.rs @@ -162,7 +162,7 @@ where let mut first_error = None; - loop { + let res = loop { if let Some(fut) = futures.next() { connection_attempt_set.spawn(fut); } @@ -180,7 +180,7 @@ where match res.context("Failed to join task")? { Ok(conn) => { // Successfully connected. - return Ok(conn); + break Ok(conn); } Err(err) => { // Some connection attempt failed. @@ -192,17 +192,19 @@ where // Out of connection attempts. // // Break out of the loop and return error. - break; + break Err( + first_error.unwrap_or_else(|| format_err!("No connection attempts were made")) + ); } } - } + }; // Abort remaining connection attempts and free resources // such as OS sockets and `Context` references // held by connection attempt tasks. connection_attempt_set.shutdown().await; - Err(first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))) + res } /// If `load_cache` is true, may use cached DNS results. From 7b0ee35335e4089e6648c9d27bcda8085433ff87 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 20:52:10 +0000 Subject: [PATCH 09/11] shutdown remaning tasks even on join err --- src/net.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/net.rs b/src/net.rs index 3ba9503fba..e83ee3a6c7 100644 --- a/src/net.rs +++ b/src/net.rs @@ -177,15 +177,18 @@ where match res { Some(res) => { - match res.context("Failed to join task")? { - Ok(conn) => { + match res.context("Failed to join task") { + Ok(Ok(conn)) => { // Successfully connected. break Ok(conn); } - Err(err) => { + Ok(Err(err)) => { // Some connection attempt failed. first_error.get_or_insert(err); } + Err(err) => { + break Err(err); + } } } None => { From 8f46d760a00365386fccbed3c44d1bf5681f1492 Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 21:07:38 +0000 Subject: [PATCH 10/11] clippy --- src/net.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net.rs b/src/net.rs index e83ee3a6c7..132cbff906 100644 --- a/src/net.rs +++ b/src/net.rs @@ -225,6 +225,6 @@ pub(crate) async fn connect_tcp( let connection_futures = lookup_host_with_cache(context, host, port, "", load_cache) .await? .into_iter() - .map(|resolved_addr| connect_tcp_inner(resolved_addr)); + .map(connect_tcp_inner); run_connection_attempts(connection_futures).await } From 16cbbf2fb18be00bc09605441d98d57c0a1813af Mon Sep 17 00:00:00 2001 From: link2xt Date: Wed, 28 Aug 2024 21:35:26 +0000 Subject: [PATCH 11/11] x --- src/configure.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/configure.rs b/src/configure.rs index c8f8a062e0..34c2cee70b 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -576,7 +576,7 @@ async fn get_autoconfig( async fn nicer_configuration_error(context: &Context, e: String) -> String { if e.to_lowercase().contains("could not resolve") - || e.to_lowercase().contains("no dns resolution results") + || e.to_lowercase().contains("connection attempts") || e.to_lowercase() .contains("temporary failure in name resolution") || e.to_lowercase().contains("name or service not known")