Skip to content

Commit

Permalink
addressed comments from latest PR
Browse files Browse the repository at this point in the history
  • Loading branch information
allenaverbukh committed Jan 17, 2025
1 parent a91a723 commit 6a51203
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ private Callback<Void> securityPostProcessRequestCallback() {
restRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath);
}
}
////// HERE

router.deleteBlob(restRequest, null, serviceId, routerCallback(),
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false));
}, restRequest.getUri(), LOGGER, finalCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void start() throws InstantiationException {
new S3MultipartUploadHandler(securityService, frontendMetrics, accountAndContainerInjector, frontendConfig,
namedBlobDb, idConverter, router, quotaManager);
s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, frontendMetrics);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, s3BatchDeleteHandler, frontendMetrics);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, frontendMetrics);
s3PostHandler = new S3PostHandler(s3MultipartUploadHandler, s3BatchDeleteHandler);
s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, frontendMetrics);
s3ListHandler = new S3ListHandler(namedBlobListHandler, frontendMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* require a response body, otherwise it is {@link ReadableStreamChannel}.
*/
abstract public class S3BaseHandler<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class);

/**
* Handles the S3 request and construct the response.
Expand Down Expand Up @@ -77,7 +77,7 @@ public void handle(RestRequest restRequest, RestResponseChannel restResponseChan
LOGGER.debug("{} {}", restRequest.getRestMethod(), path);

doHandle(restRequest, restResponseChannel, CallbackUtils.chainCallback(callback, (r) -> {
removeAmbryHeaders (restResponseChannel);
removeAmbryHeaders(restResponseChannel);
callback.onCompletion(r, null);
}));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,137 +20,120 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static com.github.ambry.frontend.FrontendUtils.*;
import static com.github.ambry.rest.RestUtils.*;


public class S3BatchDeleteHandler extends S3BaseHandler<ReadableStreamChannel> {
private static DeleteBlobHandler deleteBlobHandler;
private ArrayList<String> deleted = new ArrayList<>();
private ArrayList<String> errors = new ArrayList<>();
private boolean failedRequest;
private FrontendMetrics frontendMetrics;

// S3PostHandler -> S3BatchDeleteHandler -> S3DeleteHandler -> S3DeleteObjectHandler -> DeleteBlobHandler
public S3BatchDeleteHandler(DeleteBlobHandler deleteBlobHandler, S3BatchDeleteHandler s3DeleteHandler, FrontendMetrics frontendMetrics) {
this.deleteBlobHandler = deleteBlobHandler;
this.failedRequest = false;
}

/**
* Callback for processing batch delete requests.
*/
private class BatchDeleteCallback implements Callback<Long> {
private final RetainingAsyncWritableChannel channel;
private final RestRequest restRequest;
private final DeleteBlobHandler deleteBlobHandler;

public BatchDeleteCallback(RetainingAsyncWritableChannel channel, RestRequest restRequest, DeleteBlobHandler deleteBlobHandler) {
this.channel = channel;
this.restRequest = restRequest;
this.deleteBlobHandler = deleteBlobHandler;
// Constructor
public S3BatchDeleteHandler(DeleteBlobHandler deleteBlobHandler, FrontendMetrics frontendMetrics) {
if (frontendMetrics == null) {
throw new IllegalStateException("FrontendMetrics should not be null");
}

public void onDeleteCompletion(boolean success, String key) {
if (success) {
deleted.add(key);
}
else {
errors.add(key);
}
if (frontendMetrics.deleteBlobRouterMetrics == null) {
throw new IllegalStateException("deleteBlobRouterMetrics should not be null");
}

@Override
public void onCompletion(Long respLong, Exception exception) {
if (exception == null) {
// Data read successfully
try {
// Get the retained content from the channel
ByteBuf byteBuffer = channel.consumeContentAsByteBuf();
// TODO byte array?
// Convert ByteBuf to byte array
byte[] byteArray = new byte[byteBuffer.readableBytes()];
byteBuffer.readBytes(byteArray);

// Deserialize into S3BatchDeleteObjects
XmlMapper xmlMapper = new XmlMapper();
S3MessagePayload.S3BatchDeleteObjects deleteRequest = xmlMapper.readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class);

// Process each delete key in the batch
for (S3MessagePayload.S3BatchDeleteKeys object : deleteRequest.getObjects()) {
RequestPath requestPath = (RequestPath) restRequest.getArgs().get(RestUtils.InternalKeys.REQUEST_PATH);

// Construct the delete path
// TODO: confirm that getPathAfterPrefixes is indeed "/named/application/container"
String singleDeletePath = requestPath.getPathAfterPrefixes() + "/" + object.getKey();

// Create a new RequestPath for the delete operation
List<String> emptyList = new ArrayList<>();
RequestPath newRequestPath = RequestPath.parse(singleDeletePath, restRequest.getArgs(), emptyList, requestPath.getClusterName());
WrappedRestRequest singleDeleteRequest = new WrappedRestRequest(restRequest);
singleDeleteRequest.setArg(RestUtils.InternalKeys.REQUEST_PATH, newRequestPath);

CountDownLatch latch = new CountDownLatch(1);
NoOpResponseChannel noOpResponseChannel = new NoOpResponseChannel();

deleteBlobHandler.handle(singleDeleteRequest, noOpResponseChannel, new Callback<Void>() {
@Override
public void onCompletion(Void result, Exception exception) {
// Call our custom onDeleteCompletion to track success/failure
boolean success = exception == null;
onDeleteCompletion(success, object.getKey());
latch.countDown();
}
});

try {
latch.await(); // This will block until latch count reaches zero
} catch (InterruptedException e) {
// TODO: what to do here
Thread.currentThread().interrupt();
}
}
} catch (IOException e) {
// Handle exceptions during deserialization
failedRequest = true;
} catch (RestServiceException e) {
failedRequest = true;
}
} else {
// failed to read data
failedRequest = true;
}
}
this.deleteBlobHandler = deleteBlobHandler;
this.failedRequest = false;
this.frontendMetrics = frontendMetrics;
}

/**
* Handles the S3 request and construct the response.
* Handles the S3 request and constructs the response.
*
* @param restRequest the {@link RestRequest} that contains the request headers and body.
* @param restResponseChannel the {@link RestResponseChannel} that contains the response headers and body.
* @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception).
* @throws RestServiceException exception when the processing fails
*/
@Override
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel, Callback<ReadableStreamChannel> callback)
throws RestServiceException {
protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) throws RestServiceException {

// Create the channel to read the request body
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel();

// Pass the callback to handle the response
restRequest.readInto(channel, parseRequestBodyAndDeleteCallback(channel, restRequest, deleteBlobHandler, restResponseChannel, callback));
}

// TODO determine if we need to define max size of chanel
// Create the channel to read the request body
RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel();
/**
* This method will read the request body, deserialize it, and trigger the batch delete process.
*/
private Callback<Long> parseRequestBodyAndDeleteCallback(RetainingAsyncWritableChannel channel,
RestRequest restRequest, DeleteBlobHandler deleteBlobHandler, RestResponseChannel restResponseChannel,
Callback<ReadableStreamChannel> callback) {

return buildCallback(frontendMetrics.deleteBlobRouterMetrics, bytesRead -> {
if (bytesRead == null) {
// Handle the case where bytesRead is null
throw new IllegalStateException("bytesRead is null");
}
// Deserialize the request body into a S3BatchDeleteObjects
S3MessagePayload.S3BatchDeleteObjects deleteRequest = deserializeRequest(channel);
System.out.println("here at callback " + deleteRequest);

if (deleteRequest != null) {
// Extract request path
RequestPath requestPath = (RequestPath) restRequest.getArgs().get(RestUtils.InternalKeys.REQUEST_PATH);

// Process each delete key in the batch
for (S3MessagePayload.S3BatchDeleteKeys object : deleteRequest.getObjects()) {

// Construct the delete path for each object
String singleDeletePath = requestPath.getPathAfterPrefixes() + "/" + object.getKey();

// Create a new RequestPath for the delete operation
List<String> emptyList = new ArrayList<>();
RequestPath newRequestPath =
RequestPath.parse(singleDeletePath, restRequest.getArgs(), emptyList, requestPath.getClusterName());
WrappedRestRequest singleDeleteRequest = new WrappedRestRequest(restRequest);
singleDeleteRequest.setArg(RestUtils.InternalKeys.REQUEST_PATH, newRequestPath);

NoOpResponseChannel noOpResponseChannel = new NoOpResponseChannel();
CountDownLatch latch = new CountDownLatch(1);

// Handle the delete operation using the deleteBlobHandler
deleteBlobHandler.handle(singleDeleteRequest, noOpResponseChannel, new Callback<Void>() {
@Override
public void onCompletion(Void result, Exception exception) {
// Call our custom onDeleteCompletion to track success/failure
boolean success = exception == null;
onDeleteCompletion(success, object.getKey());

// Decrement latch to signal that this delete operation has finished
latch.countDown();
}
});

// Create and pass the BatchDeleteCallback to handle the response
restRequest.readInto(channel, new BatchDeleteCallback(channel, restRequest, deleteBlobHandler));
// Wait until all deletes are completed
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

// Check if any delete request failed
if (failedRequest) {
restResponseChannel.setStatus(ResponseStatus.BadRequest);
throw new RestServiceException("Failed to execute S3BatchDelete", RestServiceErrorCode.BadRequest);
callback.onCompletion(null, new RestServiceException("Failed to execute S3BatchDelete", RestServiceErrorCode.BadRequest));
} else {
// Successful response handling
restResponseChannel.setStatus(ResponseStatus.Ok);
try {
// TODO: move to separate method ?
XmlMapper xmlMapper = new XmlMapper();
S3MessagePayload.S3BatchDeleteResponse resp = new S3MessagePayload.S3BatchDeleteResponse();
resp.setDeleted(deleted);
Expand All @@ -162,13 +145,42 @@ protected void doHandle(RestRequest restRequest, RestResponseChannel restRespons
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, readableStreamChannel.getSize());
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, XML_CONTENT_TYPE);
callback.onCompletion(readableStreamChannel, null);

} catch (IOException e) {
throw new RestServiceException("Failed to serialize response", RestServiceErrorCode.InternalServerError);
callback.onCompletion(null, new RestServiceException("Failed to serialize response", RestServiceErrorCode.InternalServerError));
}
}
}
}, restRequest.getUri(), LOGGER, callback);
}
}


/**
* This method is used to update the lists of deleted and errored objects.
* @param success whether the delete was successful or not
* @param key the object key
*/
public void onDeleteCompletion(boolean success, String key) {
if (success) {
deleted.add(key);
} else {
errors.add(key);
}
}

/**
* Deserialize the request body into an S3BatchDeleteObjects.
*/
public S3MessagePayload.S3BatchDeleteObjects deserializeRequest(RetainingAsyncWritableChannel channel) {
S3MessagePayload.S3BatchDeleteObjects deleteRequest = null;
ByteBuf byteBuffer = channel.consumeContentAsByteBuf();
byte[] byteArray = new byte[byteBuffer.readableBytes()];
byteBuffer.readBytes(byteArray);

XmlMapper xmlMapper = new XmlMapper();
try {
deleteRequest = xmlMapper.readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class);
} catch (IOException e) {
failedRequest = true;
}
return deleteRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ private void handle(RestRequest restRequest, RestResponseChannel restResponseCha
restResponseChannel.setStatus(ResponseStatus.NoContent);
finalCallback.onCompletion(null, null);
};

deleteBlobHandler.handle(restRequest, restResponseChannel, buildCallback(metrics.s3DeleteHandleMetrics,
successAction, restRequest.getUri(), LOGGER, finalCallback));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.utils.TestUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;

Expand Down Expand Up @@ -78,9 +81,34 @@ public S3BatchDeleteHandlerTest() throws Exception {
@Test
public void deleteObjectTest() throws Exception {
// 1. Delete the object
String uri = String.format("/s3/%s/%s/%s", account.getName(), container.getName(), KEY_NAME);
String uri = String.format("/s3/%s/%s", account.getName(), container.getName());
// JSONObject batchDeleteRequest = new JSONObject();
// JSONArray objectsArray = new JSONArray();
// objectsArray.put(new JSONObject().put("Key", "object1"));
// objectsArray.put(new JSONObject().put("Key", "object2"));
// objectsArray.put(new JSONObject().put("Key", "object3"));
// batchDeleteRequest.put("Objects", objectsArray);
// String jsonBody = batchDeleteRequest.toString();
// byte[] jsonBytes = jsonBody.getBytes("UTF-8"); // Convert to byte array using UTF-8 encoding
// // 3. Wrap the byte array in a ByteBuffer
// ByteBuffer byteBuffer = ByteBuffer.wrap(jsonBytes);
// List<ByteBuffer> byteBuffers = new ArrayList<>();
// byteBuffers.add(byteBuffer);
String xmlBody = "<S3BatchDeleteObjects>" +
"<Object>" +
"<Key>object1</Key>" +
"</Object>" +
"<Object>" +
"<Key>object2</Key>" +
"</Object>" +
"<Object>" +
"<Key>object3</Key>" +
"</Object>" +
"</S3BatchDeleteObjects>";

byte[] xmlBytes = xmlBody.getBytes("UTF-8"); // Convert to byte array using UTF-8 encoding
RestRequest request =
FrontendRestRequestServiceTest.createRestRequest(RestMethod.DELETE, uri, new JSONObject(), null);
FrontendRestRequestServiceTest.createRestRequest(RestMethod.POST, uri, new JSONObject(), new LinkedList<>(Arrays.asList(ByteBuffer.wrap(xmlBytes), null)));
RestResponseChannel restResponseChannel = new MockRestResponseChannel();
request.setArg(RestUtils.InternalKeys.REQUEST_PATH,
RequestPath.parse(request, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME));
Expand Down Expand Up @@ -117,8 +145,7 @@ private void setup() throws Exception {
DeleteBlobHandler deleteBlobHandler =
new DeleteBlobHandler(router, securityService, ambryIdConverterFactory.getIdConverter(), injector, metrics,
new MockClusterMap(), QuotaTestUtils.createDummyQuotaManager(), ACCOUNT_SERVICE);
// s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, null, metrics);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, null, metrics);
s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, metrics);
}

private void putABlob() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public BatchDeletePartitionRequestInfo(PartitionId partitionId, List<BlobId> blo
for (BlobId id : blobIds) {
totalBlobIdsSizeInBytes += id.sizeInBytes();
if (!partitionId.toPathString().equals(id.getPartition().toPathString())) {
throw new IllegalArgumentException("Not all blob IDs in S3BatchDeleteRequest are from the same partition.");
throw new IllegalArgumentException("Not all blob IDs in BatchDeleteRequest are from the same partition.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected BatchDeleteRequest(int correlationId, String clientId,
super(RequestOrResponseType.BatchDeleteRequest, version, correlationId, clientId);
this.batchDeletePartitionRequestInfos = batchDeletePartitionRequestInfos;
if (batchDeletePartitionRequestInfos == null) {
throw new IllegalArgumentException("No partition info specified in S3BatchDeleteRequest");
throw new IllegalArgumentException("No partition info specified in BatchDeleteRequest");
}
for (BatchDeletePartitionRequestInfo partitionRequestInfo : batchDeletePartitionRequestInfos) {
totalPartitionRequestInfoListSizeInBytes += partitionRequestInfo.sizeInBytes();
Expand Down Expand Up @@ -127,7 +127,7 @@ public long sizeInBytes() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("S3BatchDeleteRequest[");
sb.append("BatchDeleteRequest[");
for (BatchDeletePartitionRequestInfo partitionRequestInfo : batchDeletePartitionRequestInfos) {
sb.append(partitionRequestInfo.toString());
}
Expand Down
Loading

0 comments on commit 6a51203

Please sign in to comment.