diff --git a/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsTransformer.java b/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsTransformer.java index 6d63c84de..120e81d3f 100644 --- a/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsTransformer.java +++ b/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsTransformer.java @@ -70,7 +70,10 @@ import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest; import software.amazon.awssdk.core.ResponseInputStream; +import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; @@ -80,6 +83,7 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.stream.Collectors; +import java.util.stream.Stream; public class AwsTransformer { @@ -478,6 +482,57 @@ public UploadDirectoryRequest toUploadDirectoryRequest(DirectoryUploadRequest re .build(); } + /** + * Converts a DirectoryUploadRequest to a list of file paths to upload. + * This method handles directory traversal and filtering based on the request parameters. + * + * @param request the directory upload request + * @return list of file paths to upload + */ + public List toFilePaths(DirectoryUploadRequest request) { + Path sourceDir = Paths.get(request.getLocalSourceDirectory()); + List filePaths = new ArrayList<>(); + + try (Stream paths = Files.walk(sourceDir)) { + filePaths = paths + .filter(Files::isRegularFile) + .filter(path -> { + // If includeSubFolders is false, only include files in the root directory + if (!request.isIncludeSubFolders()) { + Path relativePath = sourceDir.relativize(path); + return relativePath.getParent() == null; + } + return true; + }) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException("Failed to traverse directory: " + sourceDir, e); + } + + return filePaths; + } + + /** + * Converts a file path to a blob key by applying the prefix and maintaining directory structure. + * + * @param sourceDir the source directory path + * @param filePath the file path to convert + * @param prefix the S3 prefix to apply + * @return the blob key + */ + public String toBlobKey(Path sourceDir, Path filePath, String prefix) { + Path relativePath = sourceDir.relativize(filePath); + String key = relativePath.toString().replace("\\", "/"); // Normalize path separators + + if (prefix != null && !prefix.isEmpty()) { + // Ensure prefix ends with "/" if it doesn't already + String normalizedPrefix = prefix.endsWith("/") ? prefix : prefix + "/"; + key = normalizedPrefix + key; + } + + return key; + } + public DirectoryUploadResponse toDirectoryUploadResponse(CompletedDirectoryUpload completedDirectoryUpload) { return DirectoryUploadResponse.builder() .failedTransfers(completedDirectoryUpload.failedTransfers() diff --git a/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStore.java b/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStore.java index a0fffc9c8..f48e1e03d 100644 --- a/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStore.java +++ b/blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStore.java @@ -29,6 +29,7 @@ import com.salesforce.multicloudj.blob.driver.MultipartUploadResponse; import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest; import com.salesforce.multicloudj.blob.driver.UploadPartResponse; +import com.salesforce.multicloudj.blob.driver.FailedBlobUpload; import com.salesforce.multicloudj.blob.driver.UploadRequest; import com.salesforce.multicloudj.blob.driver.UploadResponse; import com.salesforce.multicloudj.common.aws.AwsConstants; @@ -71,10 +72,13 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URL; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -351,9 +355,50 @@ protected CompletableFuture doDownloadDirectory(Direc @Override protected CompletableFuture doUploadDirectory(DirectoryUploadRequest directoryUploadRequest) { - return transferManager.uploadDirectory(transformer.toUploadDirectoryRequest(directoryUploadRequest)) - .completionFuture() - .thenApply(transformer::toDirectoryUploadResponse); + try { + Path sourceDir = Paths.get(directoryUploadRequest.getLocalSourceDirectory()); + List filePaths = transformer.toFilePaths(directoryUploadRequest); + List> uploadFutures = new ArrayList<>(); + List failedUploads = Collections.synchronizedList(new ArrayList<>()); + + for (Path filePath : filePaths) { + CompletableFuture uploadFuture = CompletableFuture.runAsync(() -> { + try { + // Generate blob key + String blobKey = transformer.toBlobKey(sourceDir, filePath, directoryUploadRequest.getPrefix()); + + // Create UploadRequest with tags if provided + UploadRequest.Builder uploadRequestBuilder = UploadRequest.builder() + .withKey(blobKey) + .withContentLength(Files.size(filePath)); + + if (directoryUploadRequest.getTags() != null && !directoryUploadRequest.getTags().isEmpty()) { + uploadRequestBuilder.withTags(directoryUploadRequest.getTags()); + } + + UploadRequest uploadRequest = uploadRequestBuilder.build(); + + // Upload file with tags + doUpload(uploadRequest, filePath).get(); + } catch (Exception e) { + failedUploads.add(FailedBlobUpload.builder() + .source(filePath) + .exception(e) + .build()); + } + }); + + uploadFutures.add(uploadFuture); + } + + // Wait for all uploads to complete + return CompletableFuture.allOf(uploadFutures.toArray(new CompletableFuture[0])) + .thenApply(v -> DirectoryUploadResponse.builder() + .failedTransfers(failedUploads) + .build()); + } catch (Exception e) { + return CompletableFuture.failedFuture(new SubstrateSdkException("Failed to upload directory", e)); + } } @Override diff --git a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsBlobStoreTest.java b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsBlobStoreTest.java index 1f4852fc6..b5fc2a686 100644 --- a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsBlobStoreTest.java +++ b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsBlobStoreTest.java @@ -1182,4 +1182,5 @@ void testBuildS3ClientWithoutRetryConfig() { assertNotNull(store); assertEquals("bucket-1", store.getBucket()); } + } diff --git a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsTransformerTest.java b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsTransformerTest.java index ad96570e5..e4f283a3e 100644 --- a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsTransformerTest.java +++ b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsTransformerTest.java @@ -668,6 +668,33 @@ void testToUploadDirectoryRequest() { assertTrue(request.maxDepth().isPresent()); } + @Test + void testToUploadDirectoryRequest_WithTags() { + // Given + Map tags = Map.of("tag1", "value1", "tag2", "value2"); + DirectoryUploadRequest directoryUploadRequest = DirectoryUploadRequest.builder() + .localSourceDirectory("/home/documents") + .prefix("/files") + .includeSubFolders(true) + .tags(tags) + .build(); + + // When + UploadDirectoryRequest request = transformer.toUploadDirectoryRequest(directoryUploadRequest); + + // Then + assertEquals(BUCKET, request.bucket()); + assertTrue(request.maxDepth().isPresent()); + assertEquals(Integer.MAX_VALUE, request.maxDepth().getAsInt()); + assertTrue(request.s3Prefix().isPresent()); + assertEquals("/files", request.s3Prefix().get()); + assertEquals("/home/documents", request.source().toString()); + + // Note: AWS SDK 2.35.0 doesn't support tagging in directory uploads via UploadDirectoryRequest + // Tags would need to be applied post-upload or when AWS SDK is upgraded + assertNotNull(request); + } + @Test void testToDirectoryUploadResponse() { Exception exception1 = new RuntimeException("Exception1!"); diff --git a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStoreTest.java b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStoreTest.java index a6fb64633..b80ac9ad5 100644 --- a/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStoreTest.java +++ b/blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStoreTest.java @@ -1204,45 +1204,119 @@ void doDownloadDirectory() throws ExecutionException, InterruptedException { } @Test - void doUploadDirectory() throws ExecutionException, InterruptedException { - DirectoryUpload awsResponseFuture = mock(DirectoryUpload.class); - CompletedDirectoryUpload awsResponse = mock(CompletedDirectoryUpload.class); - doReturn(awsResponseFuture).when(mockS3TransferManager).uploadDirectory(any(UploadDirectoryRequest.class)); - doReturn(future(awsResponse)).when(awsResponseFuture).completionFuture(); - - Exception failedUploadException = new RuntimeException("Fake exception!"); - Path failedUploadPath = Paths.get("/home/documents/files/business/taxes.csv"); - UploadFileRequest uploadFileRequest = UploadFileRequest.builder() - .source(failedUploadPath) - .putObjectRequest(mock(PutObjectRequest.class)) - .build(); - List failedTransfers = List.of(FailedFileUpload.builder() - .request(uploadFileRequest) - .exception(failedUploadException) - .build()); - doReturn(failedTransfers).when(awsResponse).failedTransfers(); - - String source = "/home/documents"; - DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder() - .localSourceDirectory(source) - .prefix("files/") - .includeSubFolders(true) - .build(); - - // Perform the request - DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get(); - - // Verify the wiring - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(UploadDirectoryRequest.class); - verify(mockS3TransferManager, times(1)).uploadDirectory(requestCaptor.capture()); - var actualCapturedValue = requestCaptor.getValue(); - assertEquals(BUCKET, actualCapturedValue.bucket()); - assertEquals(source, actualCapturedValue.source().toString()); + void doUploadDirectory() throws ExecutionException, InterruptedException, IOException { + // Create a temporary directory with test files + Path tempDir = Files.createTempDirectory("test-upload-dir"); + try { + // Create test files + Path file1 = tempDir.resolve("file1.txt"); + Path file2 = tempDir.resolve("subdir").resolve("file2.txt"); + Files.createDirectories(file2.getParent()); + Files.write(file1, "content1".getBytes()); + Files.write(file2, "content2".getBytes()); + + // Mock putObject responses + doReturn(CompletableFuture.completedFuture(buildMockPutObjectResponse())) + .when(mockS3Client).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder() + .localSourceDirectory(tempDir.toString()) + .prefix("files/") + .includeSubFolders(true) + .build(); + + // Perform the request + DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get(); + + // Verify the results + assertNotNull(response); + assertTrue(response.getFailedTransfers().isEmpty()); + + // Verify that putObject was called for each file + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(mockS3Client, times(2)).putObject(requestCaptor.capture(), any(AsyncRequestBody.class)); + + List capturedRequests = requestCaptor.getAllValues(); + assertEquals(2, capturedRequests.size()); + + // Verify bucket and keys + for (PutObjectRequest putRequest : capturedRequests) { + assertEquals(BUCKET, putRequest.bucket()); + assertTrue(putRequest.key().startsWith("files/")); + } + } finally { + // Clean up + Files.walk(tempDir) + .sorted((a, b) -> b.compareTo(a)) + .forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + // Ignore cleanup errors + } + }); + } + } - // Verify the results - assertEquals(1, response.getFailedTransfers().size()); - assertEquals(failedUploadException, response.getFailedTransfers().get(0).getException()); - assertEquals(failedUploadPath, response.getFailedTransfers().get(0).getSource()); + @Test + void doUploadDirectory_WithTags() throws ExecutionException, InterruptedException, IOException { + // Create a temporary directory with test files + Path tempDir = Files.createTempDirectory("test-upload-dir-tags"); + try { + // Create test files + Path file1 = tempDir.resolve("file1.txt"); + Path file2 = tempDir.resolve("subdir").resolve("file2.txt"); + Files.createDirectories(file2.getParent()); + Files.write(file1, "content1".getBytes()); + Files.write(file2, "content2".getBytes()); + + Map tags = Map.of("tag1", "value1", "tag2", "value2"); + + // Mock putObject responses + doReturn(CompletableFuture.completedFuture(buildMockPutObjectResponse())) + .when(mockS3Client).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)); + + DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder() + .localSourceDirectory(tempDir.toString()) + .prefix("files/") + .includeSubFolders(true) + .tags(tags) + .build(); + + // Perform the request + DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get(); + + // Verify the results + assertNotNull(response); + assertTrue(response.getFailedTransfers().isEmpty()); + + // Verify that putObject was called for each file with tags + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); + verify(mockS3Client, times(2)).putObject(requestCaptor.capture(), any(AsyncRequestBody.class)); + + List capturedRequests = requestCaptor.getAllValues(); + assertEquals(2, capturedRequests.size()); + + // Verify tags are present in both requests + for (PutObjectRequest putRequest : capturedRequests) { + assertEquals(BUCKET, putRequest.bucket()); + assertTrue(putRequest.key().startsWith("files/")); + assertNotNull(putRequest.tagging()); + assertTrue(putRequest.tagging().contains("tag1=value1")); + assertTrue(putRequest.tagging().contains("tag2=value2")); + } + } finally { + // Clean up + Files.walk(tempDir) + .sorted((a, b) -> b.compareTo(a)) + .forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + // Ignore cleanup errors + } + }); + } } @Test diff --git a/blob/blob-client/src/main/java/com/salesforce/multicloudj/blob/driver/DirectoryUploadRequest.java b/blob/blob-client/src/main/java/com/salesforce/multicloudj/blob/driver/DirectoryUploadRequest.java index f43a99cae..a4f786132 100644 --- a/blob/blob-client/src/main/java/com/salesforce/multicloudj/blob/driver/DirectoryUploadRequest.java +++ b/blob/blob-client/src/main/java/com/salesforce/multicloudj/blob/driver/DirectoryUploadRequest.java @@ -3,6 +3,10 @@ import lombok.Builder; import lombok.Getter; +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; + /** * Wrapper object for directory upload data */ @@ -12,4 +16,12 @@ public class DirectoryUploadRequest { private final String localSourceDirectory; private final String prefix; private final boolean includeSubFolders; + /** + * (Optional parameter) The map of tagName to tagValue to be associated with all blobs in the directory + */ + private final Map tags; + + public Map getTags() { + return tags == null ? Map.of() : unmodifiableMap(tags); + } } diff --git a/blob/blob-client/src/test/java/com/salesforce/multicloudj/blob/client/AbstractBlobStoreIT.java b/blob/blob-client/src/test/java/com/salesforce/multicloudj/blob/client/AbstractBlobStoreIT.java index b3e85dced..988d41fd5 100644 --- a/blob/blob-client/src/test/java/com/salesforce/multicloudj/blob/client/AbstractBlobStoreIT.java +++ b/blob/blob-client/src/test/java/com/salesforce/multicloudj/blob/client/AbstractBlobStoreIT.java @@ -19,6 +19,9 @@ import com.salesforce.multicloudj.blob.driver.PresignedOperation; import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest; import com.salesforce.multicloudj.blob.driver.UploadPartResponse; +import com.salesforce.multicloudj.blob.driver.DirectoryUploadRequest; +import com.salesforce.multicloudj.blob.driver.DirectoryUploadResponse; +import com.salesforce.multicloudj.blob.driver.DownloadRequest; import com.salesforce.multicloudj.blob.driver.UploadRequest; import com.salesforce.multicloudj.blob.driver.UploadResponse; import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException; diff --git a/blob/blob-gcp/src/main/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStore.java b/blob/blob-gcp/src/main/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStore.java index 122b909c0..8b4f5968b 100644 --- a/blob/blob-gcp/src/main/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStore.java +++ b/blob/blob-gcp/src/main/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStore.java @@ -518,8 +518,17 @@ protected DirectoryUploadResponse doUploadDirectory(DirectoryUploadRequest direc // Generate blob key String blobKey = transformer.toBlobKey(sourceDir, filePath, directoryUploadRequest.getPrefix()); - // Upload file to GCS - use same approach as single file upload - com.google.cloud.storage.BlobInfo blobInfo = com.google.cloud.storage.BlobInfo.newBuilder(getBucket(), blobKey).build(); + // Build metadata map with tags if provided + Map metadata = new HashMap<>(); + if (directoryUploadRequest.getTags() != null && !directoryUploadRequest.getTags().isEmpty()) { + directoryUploadRequest.getTags().forEach((tagName, tagValue) -> + metadata.put(TAG_PREFIX + tagName, tagValue)); + } + + // Upload file to GCS with tags applied + com.google.cloud.storage.BlobInfo blobInfo = com.google.cloud.storage.BlobInfo.newBuilder(getBucket(), blobKey) + .setMetadata(metadata.isEmpty() ? null : metadata) + .build(); storage.createFrom(blobInfo, filePath); } catch (Exception e) { failedUploads.add(FailedBlobUpload.builder() diff --git a/blob/blob-gcp/src/test/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStoreTest.java b/blob/blob-gcp/src/test/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStoreTest.java index 59e803e02..f14bc4923 100644 --- a/blob/blob-gcp/src/test/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStoreTest.java +++ b/blob/blob-gcp/src/test/java/com/salesforce/multicloudj/blob/gcp/GcpBlobStoreTest.java @@ -100,6 +100,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -1941,6 +1942,53 @@ void testUploadDirectory_Success() throws Exception { verify(mockStorage).createFrom(any(BlobInfo.class), eq(file2)); } + @Test + void testUploadDirectory_WithTags() throws Exception { + // Given + Map tags = Map.of("tag1", "value1", "tag2", "value2"); + DirectoryUploadRequest request = DirectoryUploadRequest.builder() + .localSourceDirectory(tempDir.toString()) + .prefix("uploads/") + .includeSubFolders(true) + .tags(tags) + .build(); + + // Create test files in temp directory + Path file1 = tempDir.resolve("file1.txt"); + Path file2 = tempDir.resolve("subdir").resolve("file2.txt"); + Files.createDirectories(file2.getParent()); + Files.write(file1, "content1".getBytes()); + Files.write(file2, "content2".getBytes()); + + List filePaths = List.of(file1, file2); + when(mockTransformer.toFilePaths(request)).thenReturn(filePaths); + when(mockTransformer.toBlobKey(eq(tempDir), eq(file1), eq("uploads/"))) + .thenReturn("uploads/file1.txt"); + when(mockTransformer.toBlobKey(eq(tempDir), eq(file2), eq("uploads/"))) + .thenReturn("uploads/subdir/file2.txt"); + + // When + DirectoryUploadResponse response = gcpBlobStore.uploadDirectory(request); + + // Then + assertNotNull(response); + assertTrue(response.getFailedTransfers().isEmpty()); + + // Verify that tags are applied to both files + ArgumentCaptor blobInfoCaptor = ArgumentCaptor.forClass(BlobInfo.class); + verify(mockStorage, times(2)).createFrom(blobInfoCaptor.capture(), any(Path.class)); + + List capturedBlobInfos = blobInfoCaptor.getAllValues(); + assertEquals(2, capturedBlobInfos.size()); + + // Verify tags are present in metadata with TAG_PREFIX + for (BlobInfo blobInfo : capturedBlobInfos) { + assertNotNull(blobInfo.getMetadata()); + assertEquals("value1", blobInfo.getMetadata().get("gcp-tag-tag1")); + assertEquals("value2", blobInfo.getMetadata().get("gcp-tag-tag2")); + } + } + @Test void testUploadDirectory_WithFailures() throws Exception { // Given