Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable S3BatchDelete #2965

Merged
merged 27 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6ac91f2
[WIP] s3BatchDelete
snalli Dec 13, 2024
e9ea4a1
initial method for parsing objects from request
allenaverbukh Dec 19, 2024
5b0bf21
attempted to formulate new path for newrequest of single delete
allenaverbukh Dec 20, 2024
aa43702
current version
allenaverbukh Jan 3, 2025
a268efa
moved code to callback
allenaverbukh Jan 3, 2025
2280b8c
populated response via xmlmapper, exception handling
allenaverbukh Jan 14, 2025
a91a723
added latest
allenaverbukh Jan 15, 2025
6a51203
addressed comments from latest PR
allenaverbukh Jan 17, 2025
5716b02
UT
allenaverbukh Jan 21, 2025
0b8de6e
modified xml input
allenaverbukh Jan 22, 2025
05de8f8
block at very end
allenaverbukh Jan 23, 2025
5cd2258
changed response XML and updated UT
allenaverbukh Jan 27, 2025
3b33691
modified code to be of type int
allenaverbukh Jan 28, 2025
782894c
removed todo
allenaverbukh Jan 28, 2025
86bea0a
addressed pr comments
allenaverbukh Jan 28, 2025
7741411
final cleanup
allenaverbukh Jan 28, 2025
8399cf0
formatting
allenaverbukh Jan 28, 2025
ba6665f
refined final comments
allenaverbukh Jan 29, 2025
346853b
removed s from keys
allenaverbukh Jan 29, 2025
d6fe619
removed extra space
allenaverbukh Jan 29, 2025
21935d9
deleted extra file
allenaverbukh Jan 30, 2025
c1446db
added constant to correct file
allenaverbukh Jan 30, 2025
dec9426
changed xml to be nonconcurrent
allenaverbukh Jan 30, 2025
f26fb82
removed all extra logger statements
allenaverbukh Jan 30, 2025
d1a38f4
address more comments
allenaverbukh Jan 30, 2025
a6f1d10
updated tests
allenaverbukh Jan 30, 2025
ea23ccc
final modifications
allenaverbukh Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public static NamedBlobPath parse(String path, Map<String, Object> args) throws
* @return the {@link NamedBlobPath} that indicates the parsing result from blobUrl.
* @throws RestServiceException on parsing errors.
*/

// account/container/key
// ->
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
// account/container/? delete
// namedblobb requires key and prefix
// alter this method since it expects key or prefix but we dont have either

public static NamedBlobPath parseS3(String path, Map<String, Object> args) throws RestServiceException {
path = path.startsWith("/") ? path.substring(1) : path;
String[] splitPath = path.split("/", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,59 @@ public String toString() {
return "Bucket=" + bucket + ", Key=" + key + ", UploadId=" + uploadId;
}
}

public static class S3BatchDeleteObjects {
@JacksonXmlProperty(localName = "Object")
private static List<S3BatchDeleteKeys> objects;

public static List<S3BatchDeleteKeys> getObjects() {
return objects;
}

@Override
public String toString() {
return "Objects=" + objects;
}
}

public static class S3BatchDeleteKeys {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
@JacksonXmlProperty(localName = "Key") // Maps to the <Key> element
private String key;

public String getKey() {
return key;
}

@Override
public String toString() {
return "Key=" + key;
}

}

public static class S3BatchDeleteResponse {
@JacksonXmlProperty(localName = "deleted")
private List<String> deleted;

@JacksonXmlProperty(localName = "errors")
private List<String> success;
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

// Getters and Setters
public List<String> getDeleted() {
return deleted;
}

public void setDeleted(List<String> deleted) {
this.deleted = deleted;
}

public List<String> getErrors() {
return success;
}

public void setErrors(List<String> success) {
this.success = success;
}
}

}
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public static RequestPath parse(RestRequest restRequest, List<String> prefixesTo
* this path segment.
* @return a {@link RequestPath} object.
*/

allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved


// string path = /named/account/container/key
// getargs
// []
//


public static RequestPath parse(String path, Map<String, Object> args, List<String> prefixesToRemove,
String clusterName) throws RestServiceException {
int offset = 0;
Expand Down Expand Up @@ -312,6 +321,12 @@ private static int matchPathSegments(String path, int pathOffset, String pathSeg
return nextPathSegmentOffset;
}


// s3/account/container/key is for delete (prior to conversion) ---> s3 to named
// /s3/[account_name]/[container_name]? for batch delete



/**
* Get named blob path from S3 request path
* @param path s3 request path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class RestUtils {
public static final String PATH_SEPARATOR_STRING = "/";
public static final String STITCH = "STITCH";
public static final String UPLOADS_QUERY_PARAM = "uploads";
public static final String BATCH_DELETE_QUERY_PARAM = "delete";
public static final String UPLOAD_ID_QUERY_PARAM = "uploadId";
public static final String CONTINUE = "100-continue";
public static final String OBJECT_LOCK_PARAM = "object-lock";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseC
/**
* Start the chain by calling {@link SecurityService#processRequest}.
*/
private void start() {
private void start() {
restRequest.setArg(RestUtils.InternalKeys.KEEP_ALIVE_ON_ERROR_HINT, true);
securityService.processRequest(restRequest, securityProcessRequestCallback());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.commons.Callback;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.frontend.s3.S3BatchDeleteHandler;
import com.github.ambry.frontend.s3.S3DeleteHandler;
import com.github.ambry.frontend.s3.S3GetHandler;
import com.github.ambry.frontend.s3.S3HeadHandler;
Expand Down Expand Up @@ -113,6 +114,7 @@ class FrontendRestRequestService implements RestRequestService {
private PostDatasetsHandler postDatasetsHandler;
private GetStatsReportHandler getStatsReportHandler;
private S3DeleteHandler s3DeleteHandler;
private S3BatchDeleteHandler s3BatchDeleteHandler;
private S3ListHandler s3ListHandler;
private S3PutHandler s3PutHandler;
private S3HeadHandler s3HeadHandler;
Expand Down Expand Up @@ -240,7 +242,8 @@ public void start() throws InstantiationException {
new S3MultipartUploadHandler(securityService, frontendMetrics, accountAndContainerInjector, frontendConfig,
namedBlobDb, idConverter, router, quotaManager);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler, s3BatchDeleteHandler);
s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, frontendMetrics);
s3ListHandler = new S3ListHandler(namedBlobListHandler, frontendMetrics);
s3GetHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* require a response body, otherwise it is {@link ReadableStreamChannel}.
*/
abstract public class S3BaseHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class);

/**
* Handles the S3 request and construct the response.
Expand Down Expand Up @@ -122,6 +122,11 @@ public static boolean isMultipartCreateUploadRequest(RestRequest restRequest) {
&& restRequest.getArgs().containsKey(UPLOADS_QUERY_PARAM);
}

public static boolean isBatchDelete(RestRequest restRequest) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
return restRequest.getRestMethod() == RestMethod.POST && restRequest.getArgs().containsKey(S3_REQUEST)
&& restRequest.getArgs().containsKey(BATCH_DELETE_QUERY_PARAM);
}

/**
* @param restRequest the {@link RestRequest} that contains the request parameters.
* @return {@code True} if it is a completion/abortion of multipart uploads.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package com.github.ambry.frontend.s3;
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.commons.Callback;
import com.github.ambry.frontend.DeleteBlobHandler;
import com.github.ambry.frontend.FrontendMetrics;
import com.github.ambry.rest.NoOpResponseChannel;
import com.github.ambry.rest.RequestPath;
import com.github.ambry.rest.ResponseStatus;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestResponseChannel;
import com.github.ambry.commons.RetainingAsyncWritableChannel;
import com.github.ambry.rest.RestServiceErrorCode;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.rest.WrappedRestRequest;
import com.github.ambry.router.ReadableStreamChannel;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static com.github.ambry.frontend.FrontendUtils.*;
import static com.github.ambry.rest.RestUtils.*;

public class S3BatchDeleteHandler extends S3BaseHandler<ReadableStreamChannel> {
private static DeleteBlobHandler deleteBlobHandler;
private ArrayList<String> deleted = new ArrayList<>();
private ArrayList<String> errors = new ArrayList<>();
private boolean failedRequest;
private FrontendMetrics frontendMetrics;

// Constructor
public S3BatchDeleteHandler(DeleteBlobHandler deleteBlobHandler, FrontendMetrics frontendMetrics) {
if (frontendMetrics == null) {
throw new IllegalStateException("FrontendMetrics should not be null");
}

if (frontendMetrics.deleteBlobRouterMetrics == null) {
throw new IllegalStateException("deleteBlobRouterMetrics should not be null");
}

this.deleteBlobHandler = deleteBlobHandler;
this.failedRequest = false;
this.frontendMetrics = frontendMetrics;
}
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

/**
* Handles the S3 request and constructs the response.
*
* @param restRequest the {@link RestRequest} that contains the request headers and body.
* @param restResponseChannel the {@link RestResponseChannel} that contains the response headers and body.
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
* @throws RestServiceException exception when the processing fails
*/
@Override
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) throws RestServiceException {

// Create the channel to read the request body
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel();

// Pass the callback to handle the response
restRequest.readInto(channel, parseRequestBodyAndDeleteCallback(channel, restRequest, deleteBlobHandler, restResponseChannel, callback));
}

/**
* This method will read the request body, deserialize it, and trigger the batch delete process.
*/
private Callback<Long> parseRequestBodyAndDeleteCallback(RetainingAsyncWritableChannel channel,
RestRequest restRequest, DeleteBlobHandler deleteBlobHandler, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) {

return buildCallback(frontendMetrics.deleteBlobRouterMetrics, bytesRead -> {
if (bytesRead == null) {
// Handle the case where bytesRead is null
throw new IllegalStateException("bytesRead is null");
}
// Deserialize the request body into a S3BatchDeleteObjects
S3MessagePayload.S3BatchDeleteObjects deleteRequest = deserializeRequest(channel);
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
System.out.println("here at callback " + deleteRequest);

allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
if (deleteRequest != null) {
// Extract request path
RequestPath requestPath = (RequestPath) restRequest.getArgs().get(RestUtils.InternalKeys.REQUEST_PATH);

// Process each delete key in the batch
for (S3MessagePayload.S3BatchDeleteKeys object : deleteRequest.getObjects()) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

// Construct the delete path for each object
String singleDeletePath = requestPath.getPathAfterPrefixes() + "/" + object.getKey();

// Create a new RequestPath for the delete operation
List<String> emptyList = new ArrayList<>();
RequestPath newRequestPath =
RequestPath.parse(singleDeletePath, restRequest.getArgs(), emptyList, requestPath.getClusterName());
WrappedRestRequest singleDeleteRequest = new WrappedRestRequest(restRequest);
singleDeleteRequest.setArg(RestUtils.InternalKeys.REQUEST_PATH, newRequestPath);

NoOpResponseChannel noOpResponseChannel = new NoOpResponseChannel();
CountDownLatch latch = new CountDownLatch(1);

// Handle the delete operation using the deleteBlobHandler
deleteBlobHandler.handle(singleDeleteRequest, noOpResponseChannel, new Callback<Void>() {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onCompletion(Void result, Exception exception) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
// Call our custom onDeleteCompletion to track success/failure
boolean success = exception == null;
onDeleteCompletion(success, object.getKey());

// Decrement latch to signal that this delete operation has finished
latch.countDown();
}
});

// Wait until all deletes are completed
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Check if any delete request failed
if (failedRequest) {
restResponseChannel.setStatus(ResponseStatus.BadRequest);
callback.onCompletion(null, new RestServiceException("Failed to execute S3BatchDelete", RestServiceErrorCode.BadRequest));
} else {
// Successful response handling
restResponseChannel.setStatus(ResponseStatus.Ok);
try {
// TODO: move to separate method ?
XmlMapper xmlMapper = new XmlMapper();
S3MessagePayload.S3BatchDeleteResponse resp = new S3MessagePayload.S3BatchDeleteResponse();
resp.setDeleted(deleted);
resp.setErrors(errors);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
xmlMapper.writeValue(outputStream, resp);
ReadableStreamChannel readableStreamChannel =
new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray()));
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, readableStreamChannel.getSize());
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, XML_CONTENT_TYPE);
callback.onCompletion(readableStreamChannel, null);
} catch (IOException e) {
callback.onCompletion(null, new RestServiceException("Failed to serialize response", RestServiceErrorCode.InternalServerError));
}
}
}
}, restRequest.getUri(), LOGGER, callback);
}

/**
* This method is used to update the lists of deleted and errored objects.
* @param success whether the delete was successful or not
* @param key the object key
*/
public void onDeleteCompletion(boolean success, String key) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
if (success) {
deleted.add(key);
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
} else {
errors.add(key);
}
}

/**
* Deserialize the request body into an S3BatchDeleteObjects.
*/
public S3MessagePayload.S3BatchDeleteObjects deserializeRequest(RetainingAsyncWritableChannel channel) {
S3MessagePayload.S3BatchDeleteObjects deleteRequest = null;
ByteBuf byteBuffer = channel.consumeContentAsByteBuf();
byte[] byteArray = new byte[byteBuffer.readableBytes()];
byteBuffer.readBytes(byteArray);

XmlMapper xmlMapper = new XmlMapper();
try {
deleteRequest = xmlMapper.readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class);
} catch (IOException e) {
failedRequest = true;
}
return deleteRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class S3PostHandler extends S3BaseHandler<ReadableStreamChannel> {
/**
* Constructs a handler for uploading s3 POST requests.
*/
public S3PostHandler(S3MultipartUploadHandler multipartPostHandler) {
public S3PostHandler(S3MultipartUploadHandler multipartPostHandler, S3BatchDeleteHandler s3BatchDeleteHandler) {
this.multipartPostHandler = multipartPostHandler;
}

Expand All @@ -48,6 +48,8 @@ protected void doHandle(RestRequest restRequest, RestResponseChannel restRespons
if (S3MultipartUploadHandler.isMultipartCreateUploadRequest(restRequest)
|| S3MultipartUploadHandler.isMultipartCompleteUploadRequest(restRequest)) {
multipartPostHandler.handle(restRequest, restResponseChannel, callback);
} else if (isBatchDelete(restRequest)) {
// Placeholder for handling batch delete requests in the future.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call s3BatchDeleteHandler here.

} else {
// Placeholder for handling any non-multipart S3 POST requests in the future.
throw new RestServiceException("Unsupported S3 POST request", RestServiceErrorCode.BadRequest);
Expand Down
Loading
Loading