Skip to content

Commit

Permalink
Use exponential backoff for publish retries
Browse files Browse the repository at this point in the history
  • Loading branch information
charliermarsh committed Nov 20, 2024
1 parent ccc0962 commit ac0b055
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 57 deletions.
2 changes: 1 addition & 1 deletion crates/uv-client/src/base_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ impl RetryableStrategy for UvRetryableStrategy {
/// Check for additional transient error kinds not supported by the default retry strategy in `reqwest_retry`.
///
/// These cases should be safe to retry with [`Retryable::Transient`].
pub(crate) fn is_extended_transient_error(err: &dyn Error) -> bool {
pub fn is_extended_transient_error(err: &dyn Error) -> bool {
trace!("Attempting to retry error: {err:?}");

if let Some(err) = find_source::<WrappedReqwestError>(&err) {
Expand Down
3 changes: 2 additions & 1 deletion crates/uv-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub use base_client::{
AuthIntegration, BaseClient, BaseClientBuilder, UvRetryableStrategy, DEFAULT_RETRIES,
is_extended_transient_error, AuthIntegration, BaseClient, BaseClientBuilder,
UvRetryableStrategy, DEFAULT_RETRIES,
};
pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy};
pub use error::{Error, ErrorKind, WrappedReqwestError};
Expand Down
140 changes: 91 additions & 49 deletions crates/uv-publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ use reqwest::header::AUTHORIZATION;
use reqwest::multipart::Part;
use reqwest::{Body, Response, StatusCode};
use reqwest_middleware::RequestBuilder;
use reqwest_retry::{Retryable, RetryableStrategy};
use reqwest_retry::RetryPolicy;
use rustc_hash::FxHashSet;
use serde::Deserialize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::{env, fmt, io};
use thiserror::Error;
use tokio::io::{AsyncReadExt, BufReader};
use tokio_util::io::ReaderStream;
use tracing::{debug, enabled, trace, Level};
use url::Url;
use uv_client::{BaseClient, OwnedArchive, RegistryClientBuilder, UvRetryableStrategy};
use uv_client::{is_extended_transient_error, BaseClient, OwnedArchive, RegistryClientBuilder};
use uv_configuration::{KeyringProviderType, TrustedPublishing};
use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename};
use uv_fs::{ProgressReader, Simplified};
Expand Down Expand Up @@ -354,86 +355,127 @@ pub async fn check_trusted_publishing(
}
}

/// Upload a file to a registry.
///
/// Returns `true` if the file was newly uploaded and `false` if it already existed.
///
/// Implements a custom retry flow since the request isn't cloneable.
pub async fn upload(
/// Upload a file to a registry, with retries.
pub async fn upload_with_retry(
file: &Path,
raw_filename: &str,
filename: &DistFilename,
registry: &Url,
client: &BaseClient,
retries: u32,
username: Option<&str>,
password: Option<&str>,
check_url_client: Option<&CheckUrlClient<'_>>,
reporter: Arc<impl Reporter>,
) -> Result<bool, PublishError> {
let form_metadata = form_metadata(file, filename)
.await
.map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?;

// Retry loop
let mut attempt = 0;
let mut n_past_retries = 0;
let start_time = SystemTime::now();
let retry_policy = client.retry_policy();
loop {
let (request, idx) = build_request(
let result = upload(
file,
raw_filename,
filename,
registry,
client,
username,
password,
&form_metadata,
check_url_client,
reporter.clone(),
)
.await;
if result
.as_ref()
.err()
.is_some_and(|err| is_extended_transient_error(err))
{
let retry_decision = retry_policy.should_retry(start_time, n_past_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
warn_user!("Transient failure while handling response for {registry}; retrying...",);
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
tokio::time::sleep(duration).await;
n_past_retries += 1;
continue;
}
}
return result;
}
}

/// Upload a file to a registry.
///
/// Returns `true` if the file was newly uploaded and `false` if it already existed.
///
/// Implements a custom retry flow since the request isn't cloneable.
pub async fn upload(
file: &Path,
raw_filename: &str,
filename: &DistFilename,
registry: &Url,
client: &BaseClient,
username: Option<&str>,
password: Option<&str>,
check_url_client: Option<&CheckUrlClient<'_>>,
reporter: Arc<impl Reporter>,
) -> Result<bool, PublishError> {
let form_metadata = form_metadata(file, filename)
.await
.map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?;

let result = request.send().await;
if attempt < retries && UvRetryableStrategy.handle(&result) == Some(Retryable::Transient) {
reporter.on_download_complete(idx);
warn_user!("Transient request failure for {}, retrying", registry);
attempt += 1;
continue;
}

let response = result.map_err(|err| {
let (request, idx) = build_request(
file,
raw_filename,
filename,
registry,
client,
username,
password,
&form_metadata,
reporter.clone(),
)
.await
.map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?;

let result = request.send().await;

let response = result
.map_err(|err| {
PublishError::PublishSend(
file.to_path_buf(),
registry.clone(),
PublishSendError::ReqwestMiddleware(err),
)
})
.inspect_err(|_| {
reporter.on_download_complete(idx);
})?;

return match handle_response(registry, response).await {
Ok(()) => {
// Upload successful; for PyPI this can also mean a hash match in a raced upload
// (but it doesn't tell us), for other registries it should mean a fresh upload.
Ok(true)
}
Err(err) => {
if matches!(
err,
PublishSendError::Status(..) | PublishSendError::StatusNoBody(..)
) {
if let Some(check_url_client) = &check_url_client {
if check_url(check_url_client, file, filename).await? {
// There was a raced upload of the same file, so even though our upload failed,
// the right file now exists in the registry.
return Ok(false);
}
match handle_response(registry, response).await {
Ok(()) => {
// Upload successful; for PyPI this can also mean a hash match in a raced upload
// (but it doesn't tell us), for other registries it should mean a fresh upload.
Ok(true)
}
Err(err) => {
if matches!(
err,
PublishSendError::Status(..) | PublishSendError::StatusNoBody(..)
) {
if let Some(check_url_client) = &check_url_client {
if check_url(check_url_client, file, filename).await? {
// There was a raced upload of the same file, so even though our upload failed,
// the right file now exists in the registry.
return Ok(false);
}
}
Err(PublishError::PublishSend(
file.to_path_buf(),
registry.clone(),
err,
))
}
};
Err(PublishError::PublishSend(
file.to_path_buf(),
registry.clone(),
err,
))
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions crates/uv/src/commands/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use std::time::Duration;
use tracing::info;
use url::Url;
use uv_cache::Cache;
use uv_client::{
AuthIntegration, BaseClientBuilder, Connectivity, RegistryClientBuilder, DEFAULT_RETRIES,
};
use uv_client::{AuthIntegration, BaseClientBuilder, Connectivity, RegistryClientBuilder};
use uv_configuration::{KeyringProviderType, TrustedHost, TrustedPublishing};
use uv_distribution_types::{Index, IndexCapabilities, IndexLocations, IndexUrl};
use uv_publish::{
check_trusted_publishing, files_for_publishing, upload, CheckUrlClient, TrustedPublishResult,
check_trusted_publishing, files_for_publishing, upload_with_retry, CheckUrlClient,
TrustedPublishResult,
};

pub(crate) async fn publish(
Expand Down Expand Up @@ -170,13 +169,12 @@ pub(crate) async fn publish(
format!("({bytes:.1}{unit})").dimmed()
)?;
let reporter = PublishReporter::single(printer);
let uploaded = upload(
let uploaded = upload_with_retry(
&file,
&raw_filename,
&filename,
&publish_url,
&upload_client,
DEFAULT_RETRIES,
username.as_deref(),
password.as_deref(),
check_url_client.as_ref(),
Expand Down

0 comments on commit ac0b055

Please sign in to comment.