Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable S3BatchDelete #2965

Merged
merged 27 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6ac91f2
[WIP] s3BatchDelete
snalli Dec 13, 2024
e9ea4a1
initial method for parsing objects from request
allenaverbukh Dec 19, 2024
5b0bf21
attempted to formulate new path for newrequest of single delete
allenaverbukh Dec 20, 2024
aa43702
current version
allenaverbukh Jan 3, 2025
a268efa
moved code to callback
allenaverbukh Jan 3, 2025
2280b8c
populated response via xmlmapper, exception handling
allenaverbukh Jan 14, 2025
a91a723
added latest
allenaverbukh Jan 15, 2025
6a51203
addressed comments from latest PR
allenaverbukh Jan 17, 2025
5716b02
UT
allenaverbukh Jan 21, 2025
0b8de6e
modified xml input
allenaverbukh Jan 22, 2025
05de8f8
block at very end
allenaverbukh Jan 23, 2025
5cd2258
changed response XML and updated UT
allenaverbukh Jan 27, 2025
3b33691
modified code to be of type int
allenaverbukh Jan 28, 2025
782894c
removed todo
allenaverbukh Jan 28, 2025
86bea0a
addressed pr comments
allenaverbukh Jan 28, 2025
7741411
final cleanup
allenaverbukh Jan 28, 2025
8399cf0
formatting
allenaverbukh Jan 28, 2025
ba6665f
refined final comments
allenaverbukh Jan 29, 2025
346853b
removed s from keys
allenaverbukh Jan 29, 2025
d6fe619
removed extra space
allenaverbukh Jan 29, 2025
21935d9
deleted extra file
allenaverbukh Jan 30, 2025
c1446db
added constant to correct file
allenaverbukh Jan 30, 2025
dec9426
changed xml to be nonconcurrent
allenaverbukh Jan 30, 2025
f26fb82
removed all extra logger statements
allenaverbukh Jan 30, 2025
d1a38f4
address more comments
allenaverbukh Jan 30, 2025
a6f1d10
updated tests
allenaverbukh Jan 30, 2025
ea23ccc
final modifications
allenaverbukh Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public static NamedBlobPath parse(String path, Map<String, Object> args) throws
* @return the {@link NamedBlobPath} that indicates the parsing result from blobUrl.
* @throws RestServiceException on parsing errors.
*/

// account/container/key
// ->
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
// account/container/? delete
// namedblobb requires key and prefix
// alter this method since it expects key or prefix but we dont have either

public static NamedBlobPath parseS3(String path, Map<String, Object> args) throws RestServiceException {
path = path.startsWith("/") ? path.substring(1) : path;
String[] splitPath = path.split("/", 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,59 @@ public String toString() {
return "Bucket=" + bucket + ", Key=" + key + ", UploadId=" + uploadId;
}
}

public static class S3BatchDeleteObjects {
@JacksonXmlProperty(localName = "Object")
private static List<S3BatchDeleteKeys> objects;

public static List<S3BatchDeleteKeys> getObjects() {
return objects;
}

@Override
public String toString() {
return "Objects=" + objects;
}
}

public static class S3BatchDeleteKeys {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
@JacksonXmlProperty(localName = "Key") // Maps to the <Key> element
private String key;

public String getKey() {
return key;
}

@Override
public String toString() {
return "Key=" + key;
}

}

public static class S3BatchDeleteResponse {
@JacksonXmlProperty(localName = "deleted")
private List<String> deleted;

@JacksonXmlProperty(localName = "errors")
private List<String> success;
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

// Getters and Setters
public List<String> getDeleted() {
return deleted;
}

public void setDeleted(List<String> deleted) {
this.deleted = deleted;
}

public List<String> getErrors() {
return success;
}

public void setErrors(List<String> success) {
this.success = success;
}
}

}
15 changes: 15 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RequestPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public static RequestPath parse(RestRequest restRequest, List<String> prefixesTo
* this path segment.
* @return a {@link RequestPath} object.
*/

allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved


// string path = /named/account/container/key
// getargs
// []
//


public static RequestPath parse(String path, Map<String, Object> args, List<String> prefixesToRemove,
String clusterName) throws RestServiceException {
int offset = 0;
Expand Down Expand Up @@ -312,6 +321,12 @@ private static int matchPathSegments(String path, int pathOffset, String pathSeg
return nextPathSegmentOffset;
}


// s3/account/container/key is for delete (prior to conversion) ---> s3 to named
// /s3/[account_name]/[container_name]? for batch delete



/**
* Get named blob path from S3 request path
* @param path s3 request path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class RestUtils {
public static final String PATH_SEPARATOR_STRING = "/";
public static final String STITCH = "STITCH";
public static final String UPLOADS_QUERY_PARAM = "uploads";
public static final String BATCH_DELETE_QUERY_PARAM = "delete";
public static final String UPLOAD_ID_QUERY_PARAM = "uploadId";
public static final String CONTINUE = "100-continue";
public static final String OBJECT_LOCK_PARAM = "object-lock";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private CallbackChain(RestRequest restRequest, RestResponseChannel restResponseC
/**
* Start the chain by calling {@link SecurityService#processRequest}.
*/
private void start() {
private void start() {
restRequest.setArg(RestUtils.InternalKeys.KEEP_ALIVE_ON_ERROR_HINT, true);
securityService.processRequest(restRequest, securityProcessRequestCallback());
}
Expand Down Expand Up @@ -152,6 +152,8 @@ private Callback<Void> securityPostProcessRequestCallback() {
restRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath);
}
}
////// HERE

router.deleteBlob(restRequest, null, serviceId, routerCallback(),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false));
}, restRequest.getUri(), LOGGER, finalCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.commons.Callback;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.frontend.s3.S3BatchDeleteHandler;
import com.github.ambry.frontend.s3.S3DeleteHandler;
import com.github.ambry.frontend.s3.S3GetHandler;
import com.github.ambry.frontend.s3.S3HeadHandler;
Expand Down Expand Up @@ -113,6 +114,7 @@ class FrontendRestRequestService implements RestRequestService {
private PostDatasetsHandler postDatasetsHandler;
private GetStatsReportHandler getStatsReportHandler;
private S3DeleteHandler s3DeleteHandler;
private S3BatchDeleteHandler s3BatchDeleteHandler;
private S3ListHandler s3ListHandler;
private S3PutHandler s3PutHandler;
private S3HeadHandler s3HeadHandler;
Expand Down Expand Up @@ -240,7 +242,8 @@ public void start() throws InstantiationException {
new S3MultipartUploadHandler(securityService, frontendMetrics, accountAndContainerInjector, frontendConfig,
namedBlobDb, idConverter, router, quotaManager);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, s3BatchDeleteHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler, s3BatchDeleteHandler);
s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, frontendMetrics);
s3ListHandler = new S3ListHandler(namedBlobListHandler, frontendMetrics);
s3GetHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void handle(RestRequest restRequest, RestResponseChannel restResponseChan
LOGGER.debug("{} {}", restRequest.getRestMethod(), path);

doHandle(restRequest, restResponseChannel, CallbackUtils.chainCallback(callback, (r) -> {
removeAmbryHeaders(restResponseChannel);
removeAmbryHeaders (restResponseChannel);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change.

callback.onCompletion(r, null);
}));
} catch (Throwable t) {
Expand Down Expand Up @@ -122,6 +122,11 @@ public static boolean isMultipartCreateUploadRequest(RestRequest restRequest) {
&& restRequest.getArgs().containsKey(UPLOADS_QUERY_PARAM);
}

public static boolean isBatchDelete(RestRequest restRequest) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
return restRequest.getRestMethod() == RestMethod.POST && restRequest.getArgs().containsKey(S3_REQUEST)
&& restRequest.getArgs().containsKey(BATCH_DELETE_QUERY_PARAM);
}

/**
* @param restRequest the {@link RestRequest} that contains the request parameters.
* @return {@code True} if it is a completion/abortion of multipart uploads.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package com.github.ambry.frontend.s3;
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.github.ambry.commons.ByteBufferReadableStreamChannel;
import com.github.ambry.commons.Callback;
import com.github.ambry.frontend.DeleteBlobHandler;
import com.github.ambry.frontend.FrontendMetrics;
import com.github.ambry.rest.NoOpResponseChannel;
import com.github.ambry.rest.RequestPath;
import com.github.ambry.rest.ResponseStatus;
import com.github.ambry.rest.RestRequest;
import com.github.ambry.rest.RestResponseChannel;
import com.github.ambry.commons.RetainingAsyncWritableChannel;
import com.github.ambry.rest.RestServiceErrorCode;
import com.github.ambry.rest.RestServiceException;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.rest.WrappedRestRequest;
import com.github.ambry.router.ReadableStreamChannel;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static com.github.ambry.rest.RestUtils.*;


public class S3BatchDeleteHandler extends S3BaseHandler<ReadableStreamChannel> {
private static DeleteBlobHandler deleteBlobHandler;
private ArrayList<String> deleted = new ArrayList<>();
private ArrayList<String> errors = new ArrayList<>();
private boolean failedRequest;

// S3PostHandler -> S3BatchDeleteHandler -> S3DeleteHandler -> S3DeleteObjectHandler -> DeleteBlobHandler
public S3BatchDeleteHandler(DeleteBlobHandler deleteBlobHandler, S3BatchDeleteHandler s3DeleteHandler, FrontendMetrics frontendMetrics) {
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
this.deleteBlobHandler = deleteBlobHandler;
this.failedRequest = false;
}
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

/**
* Callback for processing batch delete requests.
*/
private class BatchDeleteCallback implements Callback<Long> {
private final RetainingAsyncWritableChannel channel;
private final RestRequest restRequest;
private final DeleteBlobHandler deleteBlobHandler;

public BatchDeleteCallback(RetainingAsyncWritableChannel channel, RestRequest restRequest, DeleteBlobHandler deleteBlobHandler) {
this.channel = channel;
this.restRequest = restRequest;
this.deleteBlobHandler = deleteBlobHandler;
}

allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
public void onDeleteCompletion(boolean success, String key) {
if (success) {
deleted.add(key);
}
else {
errors.add(key);
}
}

@Override
public void onCompletion(Long respLong, Exception exception) {
if (exception == null) {
// Data read successfully
try {
// Get the retained content from the channel
ByteBuf byteBuffer = channel.consumeContentAsByteBuf();
// TODO byte array?
// Convert ByteBuf to byte array
byte[] byteArray = new byte[byteBuffer.readableBytes()];
byteBuffer.readBytes(byteArray);

// Deserialize into S3BatchDeleteObjects
XmlMapper xmlMapper = new XmlMapper();
S3MessagePayload.S3BatchDeleteObjects deleteRequest = xmlMapper.readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class);
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

// Process each delete key in the batch
for (S3MessagePayload.S3BatchDeleteKeys object : deleteRequest.getObjects()) {
RequestPath requestPath = (RequestPath) restRequest.getArgs().get(RestUtils.InternalKeys.REQUEST_PATH);
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved

// Construct the delete path
// TODO: confirm that getPathAfterPrefixes is indeed "/named/application/container"
String singleDeletePath = requestPath.getPathAfterPrefixes() + "/" + object.getKey();

// Create a new RequestPath for the delete operation
List<String> emptyList = new ArrayList<>();
RequestPath newRequestPath = RequestPath.parse(singleDeletePath, restRequest.getArgs(), emptyList, requestPath.getClusterName());
WrappedRestRequest singleDeleteRequest = new WrappedRestRequest(restRequest);
singleDeleteRequest.setArg(RestUtils.InternalKeys.REQUEST_PATH, newRequestPath);

CountDownLatch latch = new CountDownLatch(1);
NoOpResponseChannel noOpResponseChannel = new NoOpResponseChannel();

deleteBlobHandler.handle(singleDeleteRequest, noOpResponseChannel, new Callback<Void>() {
@Override
public void onCompletion(Void result, Exception exception) {
// Call our custom onDeleteCompletion to track success/failure
boolean success = exception == null;
onDeleteCompletion(success, object.getKey());
latch.countDown();
}
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
});

try {
latch.await(); // This will block until latch count reaches zero
} catch (InterruptedException e) {
// TODO: what to do here
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
// Handle exceptions during deserialization
failedRequest = true;
} catch (RestServiceException e) {
failedRequest = true;
}
} else {
// failed to read data
failedRequest = true;
}
}
}

/**
* Handles the S3 request and construct the response.
*
* @param restRequest the {@link RestRequest} that contains the request headers and body.
* @param restResponseChannel the {@link RestResponseChannel} that contains the response headers and body.
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
* @throws RestServiceException exception when the processing fails
*/
@Override
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel, Callback<ReadableStreamChannel> callback)
throws RestServiceException {

// TODO determine if we need to define max size of chanel
// Create the channel to read the request body
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel();

// Create and pass the BatchDeleteCallback to handle the response
restRequest.readInto(channel, new BatchDeleteCallback(channel, restRequest, deleteBlobHandler));

if (failedRequest) {
restResponseChannel.setStatus(ResponseStatus.BadRequest);
throw new RestServiceException("Failed to execute S3BatchDelete", RestServiceErrorCode.BadRequest);
} else {
restResponseChannel.setStatus(ResponseStatus.Ok);
try {
XmlMapper xmlMapper = new XmlMapper();
S3MessagePayload.S3BatchDeleteResponse resp = new S3MessagePayload.S3BatchDeleteResponse();
resp.setDeleted(deleted);
resp.setErrors(errors);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
xmlMapper.writeValue(outputStream, resp);
ReadableStreamChannel readableStreamChannel =
new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray()));
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, readableStreamChannel.getSize());
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, XML_CONTENT_TYPE);
callback.onCompletion(readableStreamChannel, null);

} catch (IOException e) {
throw new RestServiceException("Failed to serialize response", RestServiceErrorCode.InternalServerError);
}
}
allenaverbukh marked this conversation as resolved.
Show resolved Hide resolved
}
}



Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private void handle(RestRequest restRequest, RestResponseChannel restResponseCha
restResponseChannel.setStatus(ResponseStatus.NoContent);
finalCallback.onCompletion(null, null);
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty line.

deleteBlobHandler.handle(restRequest, restResponseChannel, buildCallback(metrics.s3DeleteHandleMetrics,
successAction, restRequest.getUri(), LOGGER, finalCallback));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class S3PostHandler extends S3BaseHandler<ReadableStreamChannel> {
/**
* Constructs a handler for uploading s3 POST requests.
*/
public S3PostHandler(S3MultipartUploadHandler multipartPostHandler) {
public S3PostHandler(S3MultipartUploadHandler multipartPostHandler, S3BatchDeleteHandler s3BatchDeleteHandler) {
this.multipartPostHandler = multipartPostHandler;
}

Expand All @@ -48,6 +48,8 @@ protected void doHandle(RestRequest restRequest, RestResponseChannel restRespons
if (S3MultipartUploadHandler.isMultipartCreateUploadRequest(restRequest)
|| S3MultipartUploadHandler.isMultipartCompleteUploadRequest(restRequest)) {
multipartPostHandler.handle(restRequest, restResponseChannel, callback);
} else if (isBatchDelete(restRequest)) {
// Placeholder for handling batch delete requests in the future.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call s3BatchDeleteHandler here.

} else {
// Placeholder for handling any non-multipart S3 POST requests in the future.
throw new RestServiceException("Unsupported S3 POST request", RestServiceErrorCode.BadRequest);
Expand Down
Loading
Loading