Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: start new connections independently of connection failures #5927

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ where

// Start additional connection attempts after 300 ms, 1 s, 5 s and 10 s.
// This way we can have up to 5 parallel connection attempts at the same time.
let mut delays = [
let mut delay_set = JoinSet::new();
for delay in [
Duration::from_millis(300),
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_secs(10),
]
.into_iter();
] {
delay_set.spawn(tokio::time::sleep(delay));
}

let mut first_error = None;

Expand All @@ -167,44 +169,54 @@ where
connection_attempt_set.spawn(fut);
}

let one_year = Duration::from_secs(60 * 60 * 24 * 365);
let delay = delays.next().unwrap_or(one_year); // one year can be treated as infinitely long here
let Ok(res) = timeout(delay, connection_attempt_set.join_next()).await else {
// The delay for starting the next connection attempt has expired.
// `continue` the loop to push the next connection into connection_attempt_set.
continue;
};

match res {
Some(res) => {
match res.context("Failed to join task") {
Ok(Ok(conn)) => {
// Successfully connected.
break Ok(conn);
tokio::select! {
biased;

res = connection_attempt_set.join_next() => {
match res {
Some(res) => {
match res.context("Failed to join task") {
Ok(Ok(conn)) => {
// Successfully connected.
break Ok(conn);
}
Ok(Err(err)) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
Err(err) => {
break Err(err);
}
}
}
Ok(Err(err)) => {
// Some connection attempt failed.
first_error.get_or_insert(err);
}
Err(err) => {
break Err(err);
None => {
// Out of connection attempts.
//
// Break out of the loop and return error.
break Err(
first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
);
}
}
}
None => {
// Out of connection attempts.
},

_ = delay_set.join_next(), if !delay_set.is_empty() => {
// Delay expired.
//
// Break out of the loop and return error.
break Err(
first_error.unwrap_or_else(|| format_err!("No connection attempts were made"))
);
// Don't do anything other than pushing
// another connection attempt into `connection_attempt_set`.
}
}
};

// Abort remaining connection attempts and free resources
// such as OS sockets and `Context` references
// held by connection attempt tasks.
//
// `delay_set` contains just `sleep` tasks
// so no need to await futures there,
// it is enough that futures are aborted
// when the set is dropped.
connection_attempt_set.shutdown().await;

res
Expand Down
Loading