Skip to content

Commit

Permalink
feat: parallelize IMAP and SMTP connection attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 26, 2024
1 parent f912bc7 commit 260a697
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 45 deletions.
146 changes: 122 additions & 24 deletions src/imap/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::time::Duration;

use anyhow::{format_err, Context as _, Result};
use async_imap::Client as ImapClient;
use async_imap::Session as ImapSession;
use fast_socks5::client::Socks5Stream;
use tokio::io::BufWriter;
use tokio::task::JoinSet;

use super::capabilities::Capabilities;
use super::session::Session;
Expand Down Expand Up @@ -106,6 +108,53 @@ impl Client {
Ok(Session::new(session, capabilities))
}

async fn connection_attempt(
context: Context,
host: String,
security: ConnectionSecurity,
resolved_addr: SocketAddr,
strict_tls: bool,
) -> Result<Self> {
let context = &context;
let host = &host;
info!(
context,
"Attempting IMAP connection to {host} ({resolved_addr})."
);
let res = match security {
ConnectionSecurity::Tls => {
Client::connect_secure(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Starttls => {
Client::connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await,
};
match res {
Ok(client) => {
let ip_addr = resolved_addr.ip().to_string();
let port = resolved_addr.port();

let save_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
if save_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "imap", host, port, &ip_addr, time()).await?;
Ok(client)
}
Err(err) => {
warn!(
context,
"Failed to connect to {host} ({resolved_addr}): {err:#}."
);
Err(err)
}
}
}

pub async fn connect(
context: &Context,
socks5_config: Option<Socks5Config>,
Expand All @@ -131,39 +180,88 @@ impl Client {
};
Ok(client)
} else {
let mut first_error = None;
let load_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
for resolved_addr in
lookup_host_with_cache(context, host, port, "imap", load_cache).await?

let mut connection_attempt_set = JoinSet::new();
let mut delay_set = JoinSet::new();

let mut connection_futures = Vec::new();
for resolved_addr in lookup_host_with_cache(context, host, port, "imap", load_cache)
.await?
.into_iter()
.rev()
{
let res = match security {
ConnectionSecurity::Tls => {
Client::connect_secure(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Starttls => {
Client::connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => Client::connect_insecure(resolved_addr).await,
};
match res {
Ok(client) => {
let ip_addr = resolved_addr.ip().to_string();
if load_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
let context = context.clone();
let host = host.to_string();
let fut =
Self::connection_attempt(context, host, security, resolved_addr, strict_tls);
connection_futures.push(fut);
}

// Start with one connection.
if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}

// Start second connection attempt 300 ms after the first.
delay_set.spawn(tokio::time::sleep(Duration::from_millis(300)));

// Start third connection attempt if we have not managed to connect in 10 seconds.
delay_set.spawn(tokio::time::sleep(Duration::from_secs(10)));

let mut first_error = None;
loop {
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task")? {
Ok(conn) => {
// Successfully connected.
return Ok(conn);
},
Err(err) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
}
},
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break;
}
}
update_connection_history(context, "imap", host, port, &ip_addr, time())
.await?;
return Ok(client);
}
Err(err) => {
warn!(context, "Failed to connect to {resolved_addr}: {err:#}.");
first_error.get_or_insert(err);
},
_ = delay_set.join_next() => {
// Delay expired.
//
// Don't do anything other than adding
// another connection attempt to the active set.
}
}

if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}
}

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
}
}
Expand Down
138 changes: 117 additions & 21 deletions src/smtp/connect.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! SMTP connection establishment.
use std::net::SocketAddr;
use std::time::Duration;

use anyhow::{bail, format_err, Context as _, Result};
use async_smtp::{SmtpClient, SmtpTransport};
use tokio::io::BufStream;
use tokio::task::JoinSet;

use crate::context::Context;
use crate::login_param::{ConnectionCandidate, ConnectionSecurity};
Expand Down Expand Up @@ -72,6 +74,49 @@ pub(crate) async fn connect_and_auth(
Ok(transport)
}

async fn connection_attempt(
context: Context,
host: String,
security: ConnectionSecurity,
resolved_addr: SocketAddr,
strict_tls: bool,
) -> Result<Box<dyn SessionBufStream>> {
let context = &context;
let host = &host;
info!(
context,
"Attempting SMTP connection to {host} ({resolved_addr})."
);
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => connect_starttls(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
let port = resolved_addr.port();

let save_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};
if save_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
}
update_connection_history(context, "smtp", host, port, &ip_addr, time()).await?;
Ok(stream)
}
Err(err) => {
warn!(
context,
"Failed to connect to {host} ({resolved_addr}): {err:#}."
);
Err(err)
}
}
}

/// Returns TLS, STARTTLS or plaintext connection
/// using SOCKS5 or direct connection depending on the given configuration.
///
Expand Down Expand Up @@ -106,37 +151,88 @@ async fn connect_stream(
};
Ok(stream)
} else {
let mut first_error = None;
let load_cache = match security {
ConnectionSecurity::Tls | ConnectionSecurity::Starttls => strict_tls,
ConnectionSecurity::Plain => false,
};

for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache).await?
let mut connection_attempt_set = JoinSet::new();
let mut delay_set = JoinSet::new();

let mut connection_futures = Vec::new();

for resolved_addr in lookup_host_with_cache(context, host, port, "smtp", load_cache)
.await?
.into_iter()
.rev()
{
let res = match security {
ConnectionSecurity::Tls => connect_secure(resolved_addr, host, strict_tls).await,
ConnectionSecurity::Starttls => {
connect_starttls(resolved_addr, host, strict_tls).await
}
ConnectionSecurity::Plain => connect_insecure(resolved_addr).await,
};
match res {
Ok(stream) => {
let ip_addr = resolved_addr.ip().to_string();
if load_cache {
update_connect_timestamp(context, host, &ip_addr).await?;
let context = context.clone();
let host = host.to_string();
let fut = connection_attempt(context, host, security, resolved_addr, strict_tls);
connection_futures.push(fut);
}

// Start with one connection.
if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}

// Start second connection attempt 300 ms after the first.
delay_set.spawn(tokio::time::sleep(Duration::from_millis(300)));

// Start third connection attempt if we have not managed to connect in 10 seconds.
delay_set.spawn(tokio::time::sleep(Duration::from_secs(10)));

let mut first_error = None;
loop {
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task")? {
Ok(conn) => {
// Successfully connected.
return Ok(conn);
},
Err(err) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
}
},
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break;
}
}
update_connection_history(context, "smtp", host, port, &ip_addr, time())
.await?;
return Ok(stream);
}
Err(err) => {
warn!(context, "Failed to connect to {resolved_addr}: {err:#}.");
first_error.get_or_insert(err);
},
_ = delay_set.join_next() => {
// Delay expired.
//
// Don't do anything other than adding
// another connection attempt to the active set.
}
}

if let Some(fut) = connection_futures.pop() {
connection_attempt_set.spawn(fut);
}
}

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

Err(first_error.unwrap_or_else(|| format_err!("no DNS resolution results for {host}")))
}
}
Expand Down

0 comments on commit 260a697

Please sign in to comment.