diff --git a/crates/uv-client/src/base_client.rs b/crates/uv-client/src/base_client.rs index f4ade97879767..1090b1f584c54 100644 --- a/crates/uv-client/src/base_client.rs +++ b/crates/uv-client/src/base_client.rs @@ -460,7 +460,7 @@ impl RetryableStrategy for UvRetryableStrategy { /// Check for additional transient error kinds not supported by the default retry strategy in `reqwest_retry`. /// /// These cases should be safe to retry with [`Retryable::Transient`]. -pub(crate) fn is_extended_transient_error(err: &dyn Error) -> bool { +pub fn is_extended_transient_error(err: &dyn Error) -> bool { trace!("Attempting to retry error: {err:?}"); if let Some(err) = find_source::(&err) { diff --git a/crates/uv-client/src/lib.rs b/crates/uv-client/src/lib.rs index bd496aac1213c..54b7e9fa77852 100644 --- a/crates/uv-client/src/lib.rs +++ b/crates/uv-client/src/lib.rs @@ -1,5 +1,6 @@ pub use base_client::{ - AuthIntegration, BaseClient, BaseClientBuilder, UvRetryableStrategy, DEFAULT_RETRIES, + is_extended_transient_error, AuthIntegration, BaseClient, BaseClientBuilder, + UvRetryableStrategy, DEFAULT_RETRIES, }; pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy}; pub use error::{Error, ErrorKind, WrappedReqwestError}; diff --git a/crates/uv-publish/src/lib.rs b/crates/uv-publish/src/lib.rs index 4ecc17f89a863..0bffafdacf78b 100644 --- a/crates/uv-publish/src/lib.rs +++ b/crates/uv-publish/src/lib.rs @@ -11,18 +11,19 @@ use reqwest::header::AUTHORIZATION; use reqwest::multipart::Part; use reqwest::{Body, Response, StatusCode}; use reqwest_middleware::RequestBuilder; -use reqwest_retry::{Retryable, RetryableStrategy}; +use reqwest_retry::RetryPolicy; use rustc_hash::FxHashSet; use serde::Deserialize; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::{Duration, SystemTime}; use std::{env, fmt, io}; use thiserror::Error; use tokio::io::{AsyncReadExt, BufReader}; use tokio_util::io::ReaderStream; use tracing::{debug, enabled, trace, Level}; use url::Url; -use uv_client::{BaseClient, OwnedArchive, RegistryClientBuilder, UvRetryableStrategy}; +use uv_client::{is_extended_transient_error, BaseClient, OwnedArchive, RegistryClientBuilder}; use uv_configuration::{KeyringProviderType, TrustedPublishing}; use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename}; use uv_fs::{ProgressReader, Simplified}; @@ -354,31 +355,23 @@ pub async fn check_trusted_publishing( } } -/// Upload a file to a registry. -/// -/// Returns `true` if the file was newly uploaded and `false` if it already existed. -/// -/// Implements a custom retry flow since the request isn't cloneable. -pub async fn upload( +/// Upload a file to a registry, with retries. +pub async fn upload_with_retry( file: &Path, raw_filename: &str, filename: &DistFilename, registry: &Url, client: &BaseClient, - retries: u32, username: Option<&str>, password: Option<&str>, check_url_client: Option<&CheckUrlClient<'_>>, reporter: Arc, ) -> Result { - let form_metadata = form_metadata(file, filename) - .await - .map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?; - - // Retry loop - let mut attempt = 0; + let mut n_past_retries = 0; + let start_time = SystemTime::now(); + let retry_policy = client.retry_policy(); loop { - let (request, idx) = build_request( + let result = upload( file, raw_filename, filename, @@ -386,54 +379,103 @@ pub async fn upload( client, username, password, - &form_metadata, + check_url_client, reporter.clone(), ) + .await; + if result + .as_ref() + .err() + .is_some_and(|err| is_extended_transient_error(err)) + { + let retry_decision = retry_policy.should_retry(start_time, n_past_retries); + if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { + warn_user!("Transient failure while handling response for {registry}; retrying...",); + let duration = execute_after + .duration_since(SystemTime::now()) + .unwrap_or_else(|_| Duration::default()); + tokio::time::sleep(duration).await; + n_past_retries += 1; + continue; + } + } + return result; + } +} + +/// Upload a file to a registry. +/// +/// Returns `true` if the file was newly uploaded and `false` if it already existed. +/// +/// Implements a custom retry flow since the request isn't cloneable. +pub async fn upload( + file: &Path, + raw_filename: &str, + filename: &DistFilename, + registry: &Url, + client: &BaseClient, + username: Option<&str>, + password: Option<&str>, + check_url_client: Option<&CheckUrlClient<'_>>, + reporter: Arc, +) -> Result { + let form_metadata = form_metadata(file, filename) .await .map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?; - let result = request.send().await; - if attempt < retries && UvRetryableStrategy.handle(&result) == Some(Retryable::Transient) { - reporter.on_download_complete(idx); - warn_user!("Transient request failure for {}, retrying", registry); - attempt += 1; - continue; - } - - let response = result.map_err(|err| { + let (request, idx) = build_request( + file, + raw_filename, + filename, + registry, + client, + username, + password, + &form_metadata, + reporter.clone(), + ) + .await + .map_err(|err| PublishError::PublishPrepare(file.to_path_buf(), Box::new(err)))?; + + let result = request.send().await; + + let response = result + .map_err(|err| { PublishError::PublishSend( file.to_path_buf(), registry.clone(), PublishSendError::ReqwestMiddleware(err), ) + }) + .inspect_err(|_| { + reporter.on_download_complete(idx); })?; - return match handle_response(registry, response).await { - Ok(()) => { - // Upload successful; for PyPI this can also mean a hash match in a raced upload - // (but it doesn't tell us), for other registries it should mean a fresh upload. - Ok(true) - } - Err(err) => { - if matches!( - err, - PublishSendError::Status(..) | PublishSendError::StatusNoBody(..) - ) { - if let Some(check_url_client) = &check_url_client { - if check_url(check_url_client, file, filename).await? { - // There was a raced upload of the same file, so even though our upload failed, - // the right file now exists in the registry. - return Ok(false); - } + match handle_response(registry, response).await { + Ok(()) => { + // Upload successful; for PyPI this can also mean a hash match in a raced upload + // (but it doesn't tell us), for other registries it should mean a fresh upload. + Ok(true) + } + Err(err) => { + if matches!( + err, + PublishSendError::Status(..) | PublishSendError::StatusNoBody(..) + ) { + if let Some(check_url_client) = &check_url_client { + if check_url(check_url_client, file, filename).await? { + // There was a raced upload of the same file, so even though our upload failed, + // the right file now exists in the registry. + return Ok(false); } } - Err(PublishError::PublishSend( - file.to_path_buf(), - registry.clone(), - err, - )) } - }; + Err(PublishError::PublishSend( + file.to_path_buf(), + registry.clone(), + err, + )) + } } } diff --git a/crates/uv/src/commands/publish.rs b/crates/uv/src/commands/publish.rs index 3d37a4fe52104..554dd6f8b4ca5 100644 --- a/crates/uv/src/commands/publish.rs +++ b/crates/uv/src/commands/publish.rs @@ -11,13 +11,12 @@ use std::time::Duration; use tracing::info; use url::Url; use uv_cache::Cache; -use uv_client::{ - AuthIntegration, BaseClientBuilder, Connectivity, RegistryClientBuilder, DEFAULT_RETRIES, -}; +use uv_client::{AuthIntegration, BaseClientBuilder, Connectivity, RegistryClientBuilder}; use uv_configuration::{KeyringProviderType, TrustedHost, TrustedPublishing}; use uv_distribution_types::{Index, IndexCapabilities, IndexLocations, IndexUrl}; use uv_publish::{ - check_trusted_publishing, files_for_publishing, upload, CheckUrlClient, TrustedPublishResult, + check_trusted_publishing, files_for_publishing, upload_with_retry, CheckUrlClient, + TrustedPublishResult, }; pub(crate) async fn publish( @@ -170,13 +169,12 @@ pub(crate) async fn publish( format!("({bytes:.1}{unit})").dimmed() )?; let reporter = PublishReporter::single(printer); - let uploaded = upload( + let uploaded = upload_with_retry( &file, &raw_filename, &filename, &publish_url, &upload_client, - DEFAULT_RETRIES, username.as_deref(), password.as_deref(), check_url_client.as_ref(),