From c78fca3bbcea2deeb1e276a0a3dda6243f974de4 Mon Sep 17 00:00:00 2001 From: Jason Overmier Date: Sun, 24 Aug 2025 04:20:31 +0000 Subject: [PATCH 1/2] feat(aws_s3): add S3 compatibility environment variables Add AWS_S3_DISABLE_SSE and AWS_S3_DISABLE_CHECKSUMS environment variables to improve compatibility with S3-compatible storage services like MinIO and Rook Ceph RGW. Changes: - Add AWS_S3_DISABLE_SSE environment variable to optionally disable server-side encryption headers - Add AWS_S3_DISABLE_CHECKSUMS environment variable to optionally disable CRC32 checksum headers - Update multipart upload creation to conditionally add headers - Update individual part uploads to conditionally add checksums - Maintain backward compatibility (disabled by default) This resolves compatibility issues with S3-compatible services that don't support AWS-specific encryption or checksum features, while maintaining full compatibility with AWS S3. Fixes: Upload failures with InvalidArgument and NotImplemented errors on non-AWS S3 services --- crates/aws_s3/src/storage.rs | 47 ++++++++++++++++++++++++++++-------- crates/aws_utils/src/lib.rs | 24 ++++++++++++++++++ 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/crates/aws_s3/src/storage.rs b/crates/aws_s3/src/storage.rs index a6fe4bd28..c4d865e1a 100644 --- a/crates/aws_s3/src/storage.rs +++ b/crates/aws_s3/src/storage.rs @@ -26,6 +26,8 @@ use aws_sdk_s3::{ Client, }; use aws_utils::{ + are_checksums_disabled, + is_sse_disabled, must_s3_config_from_env, s3::S3Client, }; @@ -216,15 +218,25 @@ impl Storage for S3Storage { async fn start_upload(&self) -> anyhow::Result> { let key: ObjectKey = self.runtime.new_uuid_v4().to_string().try_into()?; let s3_key = S3Key(self.key_prefix.clone() + &key); - let output = self + let mut upload_builder = self .client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(&s3_key.0) - .server_side_encryption(ServerSideEncryption::Aes256) + .key(&s3_key.0); + + // Add server-side encryption if not disabled for S3 compatibility + if !is_sse_disabled() { + upload_builder = upload_builder.server_side_encryption(ServerSideEncryption::Aes256); + } + + // Add checksum algorithm if not disabled for S3 compatibility + if !are_checksums_disabled() { // Because we're using multipart uploads, we're really specifying the part checksum // algorithm here, so it needs to match what we use for each part. - .checksum_algorithm(ChecksumAlgorithm::Crc32) + upload_builder = upload_builder.checksum_algorithm(ChecksumAlgorithm::Crc32); + } + + let output = upload_builder .send() .await .context("Failed to create multipart upload")?; @@ -254,15 +266,25 @@ impl Storage for S3Storage { async fn start_client_driven_upload(&self) -> anyhow::Result { let key: ObjectKey = self.runtime.new_uuid_v4().to_string().try_into()?; let s3_key = S3Key(self.key_prefix.clone() + &key); - let output = self + let mut upload_builder = self .client .create_multipart_upload() .bucket(self.bucket.clone()) - .key(&s3_key.0) - .server_side_encryption(ServerSideEncryption::Aes256) + .key(&s3_key.0); + + // Add server-side encryption if not disabled for S3 compatibility + if !is_sse_disabled() { + upload_builder = upload_builder.server_side_encryption(ServerSideEncryption::Aes256); + } + + // Add checksum algorithm if not disabled for S3 compatibility + if !are_checksums_disabled() { // Because we're using multipart uploads, we're really specifying the part checksum // algorithm here, so it needs to match what we use for each part. - .checksum_algorithm(ChecksumAlgorithm::Crc32) + upload_builder = upload_builder.checksum_algorithm(ChecksumAlgorithm::Crc32); + } + + let output = upload_builder .send() .await .context("Failed to create multipart upload")?; @@ -565,15 +587,20 @@ impl S3Upload { let part_number = self.next_part_number()?; crate::metrics::log_aws_s3_part_upload_size_bytes(data.len()); - let builder = self + let mut builder = self .client .upload_part() - .checksum_algorithm(ChecksumAlgorithm::Crc32) .body(ByteStream::from(data)) .bucket(self.bucket.clone()) .key(&self.s3_key.0) .part_number(Into::::into(part_number) as i32) .upload_id(self.upload_id.to_string()); + + // Add checksum algorithm if not disabled for S3 compatibility + if !are_checksums_disabled() { + builder = builder.checksum_algorithm(ChecksumAlgorithm::Crc32); + } + Ok(UploadPart { part_number, builder, diff --git a/crates/aws_utils/src/lib.rs b/crates/aws_utils/src/lib.rs index 3a22d10d7..90a0045f2 100644 --- a/crates/aws_utils/src/lib.rs +++ b/crates/aws_utils/src/lib.rs @@ -33,6 +33,20 @@ static AWS_S3_FORCE_PATH_STYLE: LazyLock = LazyLock::new(|| { .unwrap_or_default() }); +static AWS_S3_DISABLE_SSE: LazyLock = LazyLock::new(|| { + env::var("AWS_S3_DISABLE_SSE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_default() +}); + +static AWS_S3_DISABLE_CHECKSUMS: LazyLock = LazyLock::new(|| { + env::var("AWS_S3_DISABLE_CHECKSUMS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_default() +}); + /// Similar aws_config::from_env but returns an error if credentials or /// region is are not. It also doesn't spew out log lines every time /// credentials are accessed. @@ -62,3 +76,13 @@ pub async fn must_s3_config_from_env() -> anyhow::Result { s3_config_builder = s3_config_builder.force_path_style(*AWS_S3_FORCE_PATH_STYLE); Ok(s3_config_builder) } + +/// Returns true if server-side encryption headers should be disabled +pub fn is_sse_disabled() -> bool { + *AWS_S3_DISABLE_SSE +} + +/// Returns true if checksum headers should be disabled +pub fn are_checksums_disabled() -> bool { + *AWS_S3_DISABLE_CHECKSUMS +} From 423c57e41505faf8ff96146de857a712798a607d Mon Sep 17 00:00:00 2001 From: Jason Overmier Date: Sun, 24 Aug 2025 04:44:50 +0000 Subject: [PATCH 2/2] refactor(aws_s3): extract helper method to reduce code duplication - Extract configure_multipart_upload_builder helper method - Eliminates code duplication between start_upload and start_client_driven_upload - Addresses GitHub Copilot review feedback for improved maintainability --- crates/aws_s3/src/storage.rs | 50 +++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/crates/aws_s3/src/storage.rs b/crates/aws_s3/src/storage.rs index c4d865e1a..216b586db 100644 --- a/crates/aws_s3/src/storage.rs +++ b/crates/aws_s3/src/storage.rs @@ -9,6 +9,7 @@ use async_trait::async_trait; use aws_config::retry::RetryConfig; use aws_sdk_s3::{ operation::{ + create_multipart_upload::builders::CreateMultipartUploadFluentBuilder, head_object::{ HeadObjectError, HeadObjectOutput, @@ -153,6 +154,27 @@ impl S3Storage { let bucket_name = s3_bucket_name(&use_case)?; S3Storage::new_with_prefix(bucket_name, key_prefix, runtime).await } + + /// Helper method to configure multipart upload builder with optional AWS headers + /// for S3 compatibility with non-AWS services + fn configure_multipart_upload_builder( + &self, + mut upload_builder: CreateMultipartUploadFluentBuilder, + ) -> CreateMultipartUploadFluentBuilder { + // Add server-side encryption if not disabled for S3 compatibility + if !is_sse_disabled() { + upload_builder = upload_builder.server_side_encryption(ServerSideEncryption::Aes256); + } + + // Add checksum algorithm if not disabled for S3 compatibility + if !are_checksums_disabled() { + // Because we're using multipart uploads, we're really specifying the part checksum + // algorithm here, so it needs to match what we use for each part. + upload_builder = upload_builder.checksum_algorithm(ChecksumAlgorithm::Crc32); + } + + upload_builder + } } async fn s3_client() -> Result { @@ -218,23 +240,13 @@ impl Storage for S3Storage { async fn start_upload(&self) -> anyhow::Result> { let key: ObjectKey = self.runtime.new_uuid_v4().to_string().try_into()?; let s3_key = S3Key(self.key_prefix.clone() + &key); - let mut upload_builder = self + let upload_builder = self .client .create_multipart_upload() .bucket(self.bucket.clone()) .key(&s3_key.0); - // Add server-side encryption if not disabled for S3 compatibility - if !is_sse_disabled() { - upload_builder = upload_builder.server_side_encryption(ServerSideEncryption::Aes256); - } - - // Add checksum algorithm if not disabled for S3 compatibility - if !are_checksums_disabled() { - // Because we're using multipart uploads, we're really specifying the part checksum - // algorithm here, so it needs to match what we use for each part. - upload_builder = upload_builder.checksum_algorithm(ChecksumAlgorithm::Crc32); - } + let upload_builder = self.configure_multipart_upload_builder(upload_builder); let output = upload_builder .send() @@ -266,23 +278,13 @@ impl Storage for S3Storage { async fn start_client_driven_upload(&self) -> anyhow::Result { let key: ObjectKey = self.runtime.new_uuid_v4().to_string().try_into()?; let s3_key = S3Key(self.key_prefix.clone() + &key); - let mut upload_builder = self + let upload_builder = self .client .create_multipart_upload() .bucket(self.bucket.clone()) .key(&s3_key.0); - // Add server-side encryption if not disabled for S3 compatibility - if !is_sse_disabled() { - upload_builder = upload_builder.server_side_encryption(ServerSideEncryption::Aes256); - } - - // Add checksum algorithm if not disabled for S3 compatibility - if !are_checksums_disabled() { - // Because we're using multipart uploads, we're really specifying the part checksum - // algorithm here, so it needs to match what we use for each part. - upload_builder = upload_builder.checksum_algorithm(ChecksumAlgorithm::Crc32); - } + let upload_builder = self.configure_multipart_upload_builder(upload_builder); let output = upload_builder .send()