Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;
import software.amazon.awssdk.core.ResponseInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -80,6 +83,7 @@
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class AwsTransformer {

Expand Down Expand Up @@ -478,6 +482,57 @@ public UploadDirectoryRequest toUploadDirectoryRequest(DirectoryUploadRequest re
.build();
}

/**
* Converts a DirectoryUploadRequest to a list of file paths to upload.
* This method handles directory traversal and filtering based on the request parameters.
*
* @param request the directory upload request
* @return list of file paths to upload
*/
public List<Path> toFilePaths(DirectoryUploadRequest request) {
Path sourceDir = Paths.get(request.getLocalSourceDirectory());
List<Path> filePaths = new ArrayList<>();

try (Stream<Path> paths = Files.walk(sourceDir)) {
filePaths = paths
.filter(Files::isRegularFile)
.filter(path -> {
// If includeSubFolders is false, only include files in the root directory
if (!request.isIncludeSubFolders()) {
Path relativePath = sourceDir.relativize(path);
return relativePath.getParent() == null;
}
return true;
})
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Failed to traverse directory: " + sourceDir, e);
}

return filePaths;
}

/**
* Converts a file path to a blob key by applying the prefix and maintaining directory structure.
*
* @param sourceDir the source directory path
* @param filePath the file path to convert
* @param prefix the S3 prefix to apply
* @return the blob key
*/
public String toBlobKey(Path sourceDir, Path filePath, String prefix) {
Path relativePath = sourceDir.relativize(filePath);
String key = relativePath.toString().replace("\\", "/"); // Normalize path separators

if (prefix != null && !prefix.isEmpty()) {
// Ensure prefix ends with "/" if it doesn't already
String normalizedPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
key = normalizedPrefix + key;
}

return key;
}

public DirectoryUploadResponse toDirectoryUploadResponse(CompletedDirectoryUpload completedDirectoryUpload) {
return DirectoryUploadResponse.builder()
.failedTransfers(completedDirectoryUpload.failedTransfers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.salesforce.multicloudj.blob.driver.MultipartUploadResponse;
import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest;
import com.salesforce.multicloudj.blob.driver.UploadPartResponse;
import com.salesforce.multicloudj.blob.driver.FailedBlobUpload;
import com.salesforce.multicloudj.blob.driver.UploadRequest;
import com.salesforce.multicloudj.blob.driver.UploadResponse;
import com.salesforce.multicloudj.common.aws.AwsConstants;
Expand Down Expand Up @@ -71,10 +72,13 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -351,9 +355,50 @@ protected CompletableFuture<DirectoryDownloadResponse> doDownloadDirectory(Direc

@Override
protected CompletableFuture<DirectoryUploadResponse> doUploadDirectory(DirectoryUploadRequest directoryUploadRequest) {
return transferManager.uploadDirectory(transformer.toUploadDirectoryRequest(directoryUploadRequest))
.completionFuture()
.thenApply(transformer::toDirectoryUploadResponse);
try {
Path sourceDir = Paths.get(directoryUploadRequest.getLocalSourceDirectory());
List<Path> filePaths = transformer.toFilePaths(directoryUploadRequest);
List<CompletableFuture<Void>> uploadFutures = new ArrayList<>();
List<FailedBlobUpload> failedUploads = Collections.synchronizedList(new ArrayList<>());

for (Path filePath : filePaths) {
CompletableFuture<Void> uploadFuture = CompletableFuture.runAsync(() -> {
try {
// Generate blob key
String blobKey = transformer.toBlobKey(sourceDir, filePath, directoryUploadRequest.getPrefix());

// Create UploadRequest with tags if provided
UploadRequest.Builder uploadRequestBuilder = UploadRequest.builder()
.withKey(blobKey)
.withContentLength(Files.size(filePath));

if (directoryUploadRequest.getTags() != null && !directoryUploadRequest.getTags().isEmpty()) {
uploadRequestBuilder.withTags(directoryUploadRequest.getTags());
}

UploadRequest uploadRequest = uploadRequestBuilder.build();

// Upload file with tags
doUpload(uploadRequest, filePath).get();
} catch (Exception e) {
failedUploads.add(FailedBlobUpload.builder()
.source(filePath)
.exception(e)
.build());
}
});

uploadFutures.add(uploadFuture);
}

// Wait for all uploads to complete
return CompletableFuture.allOf(uploadFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> DirectoryUploadResponse.builder()
.failedTransfers(failedUploads)
.build());
} catch (Exception e) {
return CompletableFuture.failedFuture(new SubstrateSdkException("Failed to upload directory", e));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,4 +1182,5 @@ void testBuildS3ClientWithoutRetryConfig() {
assertNotNull(store);
assertEquals("bucket-1", store.getBucket());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,33 @@ void testToUploadDirectoryRequest() {
assertTrue(request.maxDepth().isPresent());
}

@Test
void testToUploadDirectoryRequest_WithTags() {
// Given
Map<String, String> tags = Map.of("tag1", "value1", "tag2", "value2");
DirectoryUploadRequest directoryUploadRequest = DirectoryUploadRequest.builder()
.localSourceDirectory("/home/documents")
.prefix("/files")
.includeSubFolders(true)
.tags(tags)
.build();

// When
UploadDirectoryRequest request = transformer.toUploadDirectoryRequest(directoryUploadRequest);

// Then
assertEquals(BUCKET, request.bucket());
assertTrue(request.maxDepth().isPresent());
assertEquals(Integer.MAX_VALUE, request.maxDepth().getAsInt());
assertTrue(request.s3Prefix().isPresent());
assertEquals("/files", request.s3Prefix().get());
assertEquals("/home/documents", request.source().toString());

// Note: AWS SDK 2.35.0 doesn't support tagging in directory uploads via UploadDirectoryRequest
// Tags would need to be applied post-upload or when AWS SDK is upgraded
assertNotNull(request);
}

@Test
void testToDirectoryUploadResponse() {
Exception exception1 = new RuntimeException("Exception1!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,45 +1204,119 @@ void doDownloadDirectory() throws ExecutionException, InterruptedException {
}

@Test
void doUploadDirectory() throws ExecutionException, InterruptedException {
DirectoryUpload awsResponseFuture = mock(DirectoryUpload.class);
CompletedDirectoryUpload awsResponse = mock(CompletedDirectoryUpload.class);
doReturn(awsResponseFuture).when(mockS3TransferManager).uploadDirectory(any(UploadDirectoryRequest.class));
doReturn(future(awsResponse)).when(awsResponseFuture).completionFuture();

Exception failedUploadException = new RuntimeException("Fake exception!");
Path failedUploadPath = Paths.get("/home/documents/files/business/taxes.csv");
UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
.source(failedUploadPath)
.putObjectRequest(mock(PutObjectRequest.class))
.build();
List<FailedFileUpload> failedTransfers = List.of(FailedFileUpload.builder()
.request(uploadFileRequest)
.exception(failedUploadException)
.build());
doReturn(failedTransfers).when(awsResponse).failedTransfers();

String source = "/home/documents";
DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder()
.localSourceDirectory(source)
.prefix("files/")
.includeSubFolders(true)
.build();

// Perform the request
DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get();

// Verify the wiring
ArgumentCaptor<UploadDirectoryRequest> requestCaptor = ArgumentCaptor.forClass(UploadDirectoryRequest.class);
verify(mockS3TransferManager, times(1)).uploadDirectory(requestCaptor.capture());
var actualCapturedValue = requestCaptor.getValue();
assertEquals(BUCKET, actualCapturedValue.bucket());
assertEquals(source, actualCapturedValue.source().toString());
void doUploadDirectory() throws ExecutionException, InterruptedException, IOException {
// Create a temporary directory with test files
Path tempDir = Files.createTempDirectory("test-upload-dir");
try {
// Create test files
Path file1 = tempDir.resolve("file1.txt");
Path file2 = tempDir.resolve("subdir").resolve("file2.txt");
Files.createDirectories(file2.getParent());
Files.write(file1, "content1".getBytes());
Files.write(file2, "content2".getBytes());

// Mock putObject responses
doReturn(CompletableFuture.completedFuture(buildMockPutObjectResponse()))
.when(mockS3Client).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder()
.localSourceDirectory(tempDir.toString())
.prefix("files/")
.includeSubFolders(true)
.build();

// Perform the request
DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get();

// Verify the results
assertNotNull(response);
assertTrue(response.getFailedTransfers().isEmpty());

// Verify that putObject was called for each file
ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(mockS3Client, times(2)).putObject(requestCaptor.capture(), any(AsyncRequestBody.class));

List<PutObjectRequest> capturedRequests = requestCaptor.getAllValues();
assertEquals(2, capturedRequests.size());

// Verify bucket and keys
for (PutObjectRequest putRequest : capturedRequests) {
assertEquals(BUCKET, putRequest.bucket());
assertTrue(putRequest.key().startsWith("files/"));
}
} finally {
// Clean up
Files.walk(tempDir)
.sorted((a, b) -> b.compareTo(a))
.forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
// Ignore cleanup errors
}
});
}
}

// Verify the results
assertEquals(1, response.getFailedTransfers().size());
assertEquals(failedUploadException, response.getFailedTransfers().get(0).getException());
assertEquals(failedUploadPath, response.getFailedTransfers().get(0).getSource());
@Test
void doUploadDirectory_WithTags() throws ExecutionException, InterruptedException, IOException {
// Create a temporary directory with test files
Path tempDir = Files.createTempDirectory("test-upload-dir-tags");
try {
// Create test files
Path file1 = tempDir.resolve("file1.txt");
Path file2 = tempDir.resolve("subdir").resolve("file2.txt");
Files.createDirectories(file2.getParent());
Files.write(file1, "content1".getBytes());
Files.write(file2, "content2".getBytes());

Map<String, String> tags = Map.of("tag1", "value1", "tag2", "value2");

// Mock putObject responses
doReturn(CompletableFuture.completedFuture(buildMockPutObjectResponse()))
.when(mockS3Client).putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class));

DirectoryUploadRequest uploadRequest = DirectoryUploadRequest.builder()
.localSourceDirectory(tempDir.toString())
.prefix("files/")
.includeSubFolders(true)
.tags(tags)
.build();

// Perform the request
DirectoryUploadResponse response = aws.doUploadDirectory(uploadRequest).get();

// Verify the results
assertNotNull(response);
assertTrue(response.getFailedTransfers().isEmpty());

// Verify that putObject was called for each file with tags
ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
verify(mockS3Client, times(2)).putObject(requestCaptor.capture(), any(AsyncRequestBody.class));

List<PutObjectRequest> capturedRequests = requestCaptor.getAllValues();
assertEquals(2, capturedRequests.size());

// Verify tags are present in both requests
for (PutObjectRequest putRequest : capturedRequests) {
assertEquals(BUCKET, putRequest.bucket());
assertTrue(putRequest.key().startsWith("files/"));
assertNotNull(putRequest.tagging());
assertTrue(putRequest.tagging().contains("tag1=value1"));
assertTrue(putRequest.tagging().contains("tag2=value2"));
}
} finally {
// Clean up
Files.walk(tempDir)
.sorted((a, b) -> b.compareTo(a))
.forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
// Ignore cleanup errors
}
});
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import lombok.Builder;
import lombok.Getter;

import java.util.Map;

import static java.util.Collections.unmodifiableMap;

/**
* Wrapper object for directory upload data
*/
Expand All @@ -12,4 +16,12 @@ public class DirectoryUploadRequest {
private final String localSourceDirectory;
private final String prefix;
private final boolean includeSubFolders;
/**
* (Optional parameter) The map of tagName to tagValue to be associated with all blobs in the directory
*/
private final Map<String, String> tags;

public Map<String, String> getTags() {
return tags == null ? Map.of() : unmodifiableMap(tags);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import com.salesforce.multicloudj.blob.driver.PresignedOperation;
import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest;
import com.salesforce.multicloudj.blob.driver.UploadPartResponse;
import com.salesforce.multicloudj.blob.driver.DirectoryUploadRequest;
import com.salesforce.multicloudj.blob.driver.DirectoryUploadResponse;
import com.salesforce.multicloudj.blob.driver.DownloadRequest;
import com.salesforce.multicloudj.blob.driver.UploadRequest;
import com.salesforce.multicloudj.blob.driver.UploadResponse;
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
Expand Down
Loading
Loading