Skip to content

Commit

Permalink
feat: parallelize IMAP and SMTP connection attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 27, 2024
1 parent 273158a commit ce32ded
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 47 deletions.
147 changes: 122 additions & 25 deletions src/imap/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_imap::Client as ImapClient;
use async_imap::Session as ImapSession;
use fast_socks5::client::Socks5Stream;
use tokio::io::BufWriter;
use tokio::task::JoinSet;

use super::capabilities::Capabilities;
use super::session::Session;
Expand All @@ -14,7 +15,9 @@ use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
use crate::net::session::SessionStream;
use crate::net::tls::wrap_tls;
use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history};
use crate::net::{
connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS,
};
use crate::socks::Socks5Config;
use crate::tools::time;

Expand Down Expand Up @@ -106,6 +109,53 @@ impl Client {
Ok(Session::new(session, capabilities))
}

async fn connection_attempt(
context: Context,
host: String,
security: ConnectionSecurity,
resolved_addr: SocketAddr,
strict_tls: bool,
) -> Result<Self> {
let context = &context;
let host = &host;
info!(
context,
"Attempting IMAP connection to {host} ({resolved_addr})."
);
let res = match security {
ConnectionSecurity::Tls => {
Client::connect_secure(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Starttls => {
Client::connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await,
};
match res {
Ok(client) => {
let ip_addr = resolved_addr.ip().to_string();
let port = resolved_addr.port();

let save_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
if save_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "imap", host, port, &ip_addr, time()).await?;
Ok(client)
}
Err(err) => {
warn!(
context,
"Failed to connect to {host} ({resolved_addr}): {err:#}."
);
Err(err)
}
}
}

pub async fn connect(
context: &Context,
socks5_config: Option<Socks5Config>,
Expand All @@ -131,39 +181,86 @@ impl Client {
};
Ok(client)
} else {
let mut first_error = None;
let load_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
for resolved_addr in
lookup_host_with_cache(context, host, port, "imap", load_cache).await?

let mut connection_attempt_set = JoinSet::new();
let mut delay_set = JoinSet::new();

let mut connection_futures = Vec::new();
for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache)
.await?
.into_iter()
.rev()
{
let res = match security {
ConnectionSecurity::Tls => {
Client::connect_secure(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Starttls => {
Client::connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await,
};
match res {
Ok(client) => {
let ip_addr = resolved_addr.ip().to_string();
if load_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
let context = context.clone();
let host = host.to_string();
let fut =
Self::connection_attempt(context, host, security, resolved_addr, strict_tls);
connection_futures.push(fut);
}

// Start with one connection.
if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}

for delay in CONNECTION_DELAYS {
delay_set.spawn(tokio::time::sleep(delay));
}

let mut first_error = None;
loop {
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task")? {
Ok(conn) => {
// Successfully connected.
return Ok(conn);
},
Err(err) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
}
},
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break;
}
}
update_connection_history(context, "imap", host, port, &ip_addr, time())
.await?;
return Ok(client);
}
Err(err) => {
warn!(context, "Failed to connect to {resolved_addr}: {err:#}.");
first_error.get_or_insert(err);
},
_ = delay_set.join_next(), if !delay_set.is_empty() => {
// Delay expired.
//
// Don't do anything other than adding
// another connection attempt to the active set.
}
}

if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}
}

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ pub(crate) const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(300);
/// TTL for caches in seconds.
pub(crate) const CACHE_TTL: u64 = 30 * 24 * 60 * 60;

/// Start additional connection attempts after 300 ms, 5 s, 10 s and 15 s.
/// This way we can have up to 5 parallel connection attempts at the same time.
pub(crate) const CONNECTION_DELAYS: [Duration; 4] = [
Duration::from_millis(300),
Duration::from_secs(5),
Duration::from_secs(10),
Duration::from_secs(15),
];

/// Removes connection history entries after `CACHE_TTL`.
pub(crate) async fn prune_connection_history(context: &Context) -> Result<()> {
let now = time();
Expand Down
139 changes: 117 additions & 22 deletions src/smtp/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use std::net::SocketAddr;
use anyhow::{bail, format_err, Context as _, Result};
use async_smtp::{SmtpClient, SmtpTransport};
use tokio::io::BufStream;
use tokio::task::JoinSet;

use crate::context::Context;
use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
use crate::net::dns::{lookup_host_with_cache, update_connect_timestamp};
use crate::net::session::SessionBufStream;
use crate::net::tls::wrap_tls;
use crate::net::{connect_tcp_inner, connect_tls_inner, update_connection_history};
use crate::net::{
connect_tcp_inner, connect_tls_inner, update_connection_history, CONNECTION_DELAYS,
};
use crate::oauth2::get_oauth2_access_token;
use crate::socks::Socks5Config;
use crate::tools::time;
Expand Down Expand Up @@ -72,6 +75,49 @@ pub(crate) async fn connect_and_auth(
Ok(transport)
}

async fn connection_attempt(
context: Context,
host: String,
security: ConnectionSecurity,
resolved_addr: SocketAddr,
strict_tls: bool,
) -> Result<Box<dyn SessionBufStream>> {
let context = &context;
let host = &host;
info!(
context,
"Attempting SMTP connection to {host} ({resolved_addr})."
);
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
let port = resolved_addr.port();

let save_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
if save_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
Ok(stream)
}
Err(err) => {
warn!(
context,
"Failed to connect to {host} ({resolved_addr}): {err:#}."
);
Err(err)
}
}
}

/// Returns TLS, STARTTLS or plaintext connection
/// using SOCKS5 or direct connection depending on the given configuration.
///
Expand Down Expand Up @@ -106,37 +152,86 @@ async fn connect_stream(
};
Ok(stream)
} else {
let mut first_error = None;
let load_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};

for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await?
let mut connection_attempt_set = JoinSet::new();
let mut delay_set = JoinSet::new();

let mut connection_futures = Vec::new();

for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache)
.await?
.into_iter()
.rev()
{
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => {
connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
if load_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
let context = context.clone();
let host = host.to_string();
let fut = connection_attempt(context, host, security, resolved_addr, strict_tls);
connection_futures.push(fut);
}

// Start with one connection.
if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}

for delay in CONNECTION_DELAYS {
delay_set.spawn(tokio::time::sleep(delay));
}

let mut first_error = None;
loop {
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task")? {
Ok(conn) => {
// Successfully connected.
return Ok(conn);
},
Err(err) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
}
},
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break;
}
}
update_connection_history(context, "smtp", host, port, &ip_addr, time())
.await?;
return Ok(stream);
}
Err(err) => {
warn!(context, "Failed to connect to {resolved_addr}: {err:#}.");
first_error.get_or_insert(err);
},
_ = delay_set.join_next(), if !delay_set.is_empty() => {
// Delay expired.
//
// Don't do anything other than adding
// another connection attempt to the active set.
}
}

if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}
}

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
}
}
Expand Down

0 comments on commit ce32ded

Please sign in to comment.