diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java index b080797d4c..0987d82bf8 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/NamedBlobPath.java @@ -93,6 +93,7 @@ public static NamedBlobPath parseS3(String path, Map args) throw path = path.startsWith("/") ? path.substring(1) : path; String[] splitPath = path.split("/", 4); String blobNamePrefix = RestUtils.getHeader(args, PREFIX_PARAM, false); + boolean isBatchDelete = args.containsKey(BATCH_DELETE_QUERY_PARAM); boolean isGetObjectLockRequest = args.containsKey(OBJECT_LOCK_PARAM); //There are two cases for S3 listing //1.has prefix (Ex:GET /?prefix=prefixName&delimiter=&encoding-type=url) @@ -118,7 +119,7 @@ public static NamedBlobPath parseS3(String path, Map args) throw } return new NamedBlobPath(accountName, containerName, null, blobNamePrefix, pageToken); } - if (isGetObjectLockRequest) { + if (isGetObjectLockRequest || isBatchDelete) { return new NamedBlobPath(accountName, containerName, null, null, null); } else { String blobName = splitPath[3]; diff --git a/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java b/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java index 8f57876d26..94550dd05e 100644 --- a/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java +++ b/ambry-api/src/main/java/com/github/ambry/frontend/s3/S3MessagePayload.java @@ -14,9 +14,12 @@ */ package com.github.ambry.frontend.s3; +import java.util.ArrayList; import java.util.List; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper; +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement; import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty; +import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -351,4 +354,143 @@ public String toString() { return "Bucket=" + bucket + ", Key=" + key + ", UploadId=" + uploadId; } } + + public static class S3BatchDeleteObjects { + + // Ensure that the "Delete" wrapper element is mapped correctly to the list of "Object" elements + @JacksonXmlElementWrapper(useWrapping = false) // Avoids wrapping the element itself + @JacksonXmlProperty(localName = "Object") // Specifies that each element maps to an instance of S3BatchDeleteKeys + private List objects; + + public List getObjects() { + return objects; + } + + public void setObjects(List objects) { + this.objects = objects; + } + + @Override + public String toString() { + return "S3BatchDeleteObjects{" + + "objects=" + objects + + '}'; + } + } + + public static class S3BatchDeleteKey { + + // Maps the element inside each to the 'key' property in S3BatchDeleteKeys + @JacksonXmlProperty(localName = "Key") + private String key; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + @Override + public String toString() { + return "S3BatchDeleteKey{" + + "key='" + key + '\'' + + '}'; + } + } + + // exact naming from S3BatchDelete response + public static class DeleteResult { + + @JacksonXmlElementWrapper(useWrapping = false) + @JacksonXmlProperty(localName = "Error") // Maps to in XML + private List errors; + + @JacksonXmlElementWrapper(useWrapping = false) + @JacksonXmlProperty(localName = "Deleted") // Maps to in XML + private List deleted; + + // Getters and setters + public List getErrors() { + return errors; + } + + public void setErrors(List errors) { + this.errors = errors; + } + + public List getDeleted() { + return deleted; + } + + public void setDeleted(List deleted) { + this.deleted = deleted; + } + } + + public static class S3ErrorObject { + + @JacksonXmlProperty(localName = "Key") + private String key; + + @JacksonXmlProperty(localName = "Code") + private String code; + + public S3ErrorObject(){} + + public S3ErrorObject(String key, String code) { + this.key = key; + this.code = code; + } + + // Getters and setters + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + @Override + public String toString() { + return "S3ErrorObject{key='" + key + "', code='" + code + "'}"; + } + } + + public static class S3DeletedObject { + + @JacksonXmlProperty(localName = "Key") + private String key; + + public S3DeletedObject() {} + + public S3DeletedObject(String key) { + this.key = key; + } + + // Getters and setters + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + @Override + public String toString() { + return "S3DeletedObject{key='" + key + "'}"; + } + } } + diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java index 8868aa5822..6d9bab45db 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java @@ -66,6 +66,7 @@ public class RestUtils { public static final String PATH_SEPARATOR_STRING = "/"; public static final String STITCH = "STITCH"; public static final String UPLOADS_QUERY_PARAM = "uploads"; + public static final String BATCH_DELETE_QUERY_PARAM = "delete"; public static final String UPLOAD_ID_QUERY_PARAM = "uploadId"; public static final String CONTINUE = "100-continue"; public static final String OBJECT_LOCK_PARAM = "object-lock"; diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendMetrics.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendMetrics.java index a71c9ffa40..b1272647ec 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendMetrics.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendMetrics.java @@ -18,6 +18,7 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.FrontendConfig; +import com.github.ambry.frontend.s3.S3BatchDeleteHandler; import com.github.ambry.frontend.s3.S3DeleteHandler; import com.github.ambry.frontend.s3.S3GetHandler; import com.github.ambry.frontend.s3.S3ListHandler; @@ -35,6 +36,7 @@ public class FrontendMetrics { // RestRequestMetricsGroup // DELETE public final RestRequestMetricsGroup deleteBlobMetricsGroup; + public final RestRequestMetricsGroup batchDeleteMetricsGroup; public final RestRequestMetricsGroup deleteDatasetsMetricsGroup; //COPY public final RestRequestMetricsGroup copyBlobMetricsGroup; @@ -165,6 +167,7 @@ public class FrontendMetrics { public final AsyncOperationTracker.Metrics deleteDatasetOutOfRetentionRequestMetrics; public final AsyncOperationTracker.Metrics s3DeleteHandleMetrics; + public final AsyncOperationTracker.Metrics s3BatchDeleteHandleMetrics; public final AsyncOperationTracker.Metrics s3ListHandleMetrics; public final AsyncOperationTracker.Metrics s3PutHandleMetrics; public final AsyncOperationTracker.Metrics s3GetHandleMetrics; @@ -315,6 +318,9 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon deleteBlobMetricsGroup = new RestRequestMetricsGroup(FrontendRestRequestService.class, "DeleteBlob", false, metricRegistry, frontendConfig); + batchDeleteMetricsGroup = + new RestRequestMetricsGroup(FrontendRestRequestService.class, "BatchDeleteBlob", false, metricRegistry, + frontendConfig); deleteDatasetsMetricsGroup = new RestRequestMetricsGroup(FrontendRestRequestService.class, "DeleteDataset", false, metricRegistry, frontendConfig); @@ -505,6 +511,7 @@ public FrontendMetrics(MetricRegistry metricRegistry, FrontendConfig frontendCon new AsyncOperationTracker.Metrics(NamedBlobPutHandler.class, "RetentionRequest", metricRegistry); s3DeleteHandleMetrics = new AsyncOperationTracker.Metrics(S3DeleteHandler.class, "S3Handle", metricRegistry); + s3BatchDeleteHandleMetrics = new AsyncOperationTracker.Metrics(S3BatchDeleteHandler.class, "S3Handle", metricRegistry); s3ListHandleMetrics = new AsyncOperationTracker.Metrics(S3ListHandler.class, "S3Handle", metricRegistry); s3PutHandleMetrics = new AsyncOperationTracker.Metrics(S3PutHandler.class, "S3Handle", metricRegistry); s3GetHandleMetrics = new AsyncOperationTracker.Metrics(S3GetHandler.class, "S3Handle", metricRegistry); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java index 123e1ec373..085f07fb47 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/FrontendRestRequestService.java @@ -18,6 +18,7 @@ import com.github.ambry.clustermap.ClusterMap; import com.github.ambry.commons.Callback; import com.github.ambry.config.FrontendConfig; +import com.github.ambry.frontend.s3.S3BatchDeleteHandler; import com.github.ambry.frontend.s3.S3DeleteHandler; import com.github.ambry.frontend.s3.S3GetHandler; import com.github.ambry.frontend.s3.S3HeadHandler; @@ -113,6 +114,7 @@ class FrontendRestRequestService implements RestRequestService { private PostDatasetsHandler postDatasetsHandler; private GetStatsReportHandler getStatsReportHandler; private S3DeleteHandler s3DeleteHandler; + private S3BatchDeleteHandler s3BatchDeleteHandler; private S3ListHandler s3ListHandler; private S3PutHandler s3PutHandler; private S3HeadHandler s3HeadHandler; @@ -240,7 +242,8 @@ public void start() throws InstantiationException { new S3MultipartUploadHandler(securityService, frontendMetrics, accountAndContainerInjector, frontendConfig, namedBlobDb, idConverter, router, quotaManager); s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, frontendMetrics); - s3PostHandler = new S3PostHandler(s3MultipartUploadHandler); + s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, frontendMetrics); + s3PostHandler = new S3PostHandler(s3MultipartUploadHandler, s3BatchDeleteHandler); s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, frontendMetrics); s3ListHandler = new S3ListHandler(namedBlobListHandler, frontendMetrics); s3GetHandler = diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BaseHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BaseHandler.java index 03d7aee5db..e33d419690 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BaseHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BaseHandler.java @@ -44,7 +44,7 @@ * require a response body, otherwise it is {@link ReadableStreamChannel}. */ abstract public class S3BaseHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(S3BaseHandler.class); /** * Handles the S3 request and construct the response. @@ -122,6 +122,15 @@ public static boolean isMultipartCreateUploadRequest(RestRequest restRequest) { && restRequest.getArgs().containsKey(UPLOADS_QUERY_PARAM); } + /** + * @param restRequest the {@link RestRequest} that contains the request parameters. + * @return {@code True} if it is a request for batch delete. + */ + public static boolean isBatchDelete(RestRequest restRequest) { + return restRequest.getRestMethod() == RestMethod.POST && restRequest.getArgs().containsKey(S3_REQUEST) + && restRequest.getArgs().containsKey(BATCH_DELETE_QUERY_PARAM); + } + /** * @param restRequest the {@link RestRequest} that contains the request parameters. * @return {@code True} if it is a completion/abortion of multipart uploads. diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BatchDeleteHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BatchDeleteHandler.java new file mode 100644 index 0000000000..276a457fbd --- /dev/null +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3BatchDeleteHandler.java @@ -0,0 +1,176 @@ +/* + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ +package com.github.ambry.frontend.s3; + +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; +import com.github.ambry.commons.Callback; +import com.github.ambry.frontend.DeleteBlobHandler; +import com.github.ambry.frontend.FrontendMetrics; +import com.github.ambry.rest.NoOpResponseChannel; +import com.github.ambry.rest.RequestPath; +import com.github.ambry.rest.ResponseStatus; +import com.github.ambry.rest.RestRequest; +import com.github.ambry.rest.RestRequestMetrics; +import com.github.ambry.rest.RestResponseChannel; +import com.github.ambry.commons.RetainingAsyncWritableChannel; +import com.github.ambry.rest.RestServiceErrorCode; +import com.github.ambry.rest.RestServiceException; +import com.github.ambry.rest.WrappedRestRequest; +import com.github.ambry.router.ReadableStreamChannel; +import io.netty.buffer.ByteBuf; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import com.fasterxml.jackson.dataformat.xml.ser.ToXmlGenerator; + +import static com.github.ambry.frontend.FrontendUtils.*; +import static com.github.ambry.frontend.s3.S3Constants.*; +import static com.github.ambry.rest.RestUtils.*; + +public class S3BatchDeleteHandler extends S3BaseHandler { + private static DeleteBlobHandler deleteBlobHandler; + private final XmlMapper xmlMapper; + private FrontendMetrics metrics; + + // Constructor + public S3BatchDeleteHandler(DeleteBlobHandler deleteBlobHandler, FrontendMetrics frontendMetrics) { + this.deleteBlobHandler = deleteBlobHandler; + this.metrics = frontendMetrics; + this.xmlMapper = new XmlMapper(); + this.xmlMapper.configure(ToXmlGenerator.Feature.WRITE_XML_DECLARATION, true); + } + + /** + * Handles the S3 request and constructs the response. + * + * @param restRequest the {@link RestRequest} that contains the request headers and body. + * @param restResponseChannel the {@link RestResponseChannel} that contains the response headers and body. + * @param callback the {@link Callback} to invoke when the response is ready (or if there is an exception). + * @throws RestServiceException exception when the processing fails + */ + @Override + protected void doHandle(RestRequest restRequest, RestResponseChannel restResponseChannel, + Callback callback) throws RestServiceException { + + // Create the channel to read the request body + RetainingAsyncWritableChannel channel = new RetainingAsyncWritableChannel(); + + // inject metrics for batch delete + RestRequestMetrics requestMetrics = + metrics.batchDeleteMetricsGroup.getRestRequestMetrics(restRequest.isSslUsed(), false); + restRequest.getMetricsTracker().injectMetrics(requestMetrics); + + // Pass the callback to handle the response + restRequest.readInto(channel, + parseRequestBodyAndDeleteCallback(channel, restRequest, deleteBlobHandler, restResponseChannel, callback)); + } + + /** + * This method will read the request body, deserialize it, and trigger the batch delete process. + */ + private Callback parseRequestBodyAndDeleteCallback(RetainingAsyncWritableChannel channel, + RestRequest restRequest, DeleteBlobHandler deleteBlobHandler, RestResponseChannel restResponseChannel, + Callback finalCallback) { + + return buildCallback(metrics.s3BatchDeleteHandleMetrics, bytesRead -> { + if (bytesRead == 0) { + throw new RestServiceException("bytesRead is empty", RestServiceErrorCode.BadRequest); + } + + // deserialize the xml as deleteRequest + S3MessagePayload.S3BatchDeleteObjects deleteRequest = deserializeRequest(channel); +; + // validate the request for size + if (deleteRequest.getObjects().size() > MAX_BATCH_DELETE_SIZE) { + String batchSizeErrorMessage = "Exceeded Maximum Batch Size of "; + throw new RestServiceException(batchSizeErrorMessage, RestServiceErrorCode.BadRequest); + } + + RequestPath requestPath = (RequestPath) restRequest.getArgs().get(InternalKeys.REQUEST_PATH); + + // create objects for processing the request + ConcurrentLinkedQueue errors = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue deleted = new ConcurrentLinkedQueue<>(); + List> deleteFutures = new ArrayList<>(); + + for (S3MessagePayload.S3BatchDeleteKey object : deleteRequest.getObjects()) { + String singleDeletePath = requestPath.getPathAfterPrefixes() + "/" + object.getKey(); + RequestPath newRequestPath = + RequestPath.parse(singleDeletePath, restRequest.getArgs(), new ArrayList<>(), requestPath.getClusterName()); + WrappedRestRequest singleDeleteRequest = new WrappedRestRequest(restRequest); + singleDeleteRequest.setArg(InternalKeys.REQUEST_PATH, newRequestPath); + NoOpResponseChannel noOpResponseChannel = new NoOpResponseChannel(); + CompletableFuture future = new CompletableFuture<>(); + + // Handle the delete operation using the deleteBlobHandler + deleteBlobHandler.handle(singleDeleteRequest, noOpResponseChannel, (result, exception) -> { + // Call our custom onDeleteCompletion to track success/failure + if (exception == null) { + deleted.add(new S3MessagePayload.S3DeletedObject(object.getKey())); + } + else if (exception instanceof RestServiceException) { + RestServiceException restServiceException = (RestServiceException) exception; + errors.add((new S3MessagePayload.S3ErrorObject(object.getKey(), restServiceException.getErrorCode().toString()))); + } + else { + errors.add((new S3MessagePayload.S3ErrorObject(object.getKey(), RestServiceErrorCode.InternalServerError.toString()))); + } + future.complete(null); + }); + deleteFutures.add(future); + } + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) + .whenComplete((result, exception) -> { + try { + // construct and serialize response + S3MessagePayload.DeleteResult response = new S3MessagePayload.DeleteResult(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + response.setDeleted(new ArrayList<>(deleted)); + response.setErrors(new ArrayList<>(errors)); + xmlMapper.writeValue(outputStream, response); + ReadableStreamChannel readableStreamChannel = + new ByteBufferReadableStreamChannel(ByteBuffer.wrap(outputStream.toByteArray())); + restResponseChannel.setHeader(Headers.CONTENT_LENGTH, readableStreamChannel.getSize()); + restResponseChannel.setHeader(Headers.CONTENT_TYPE, XML_CONTENT_TYPE); + restResponseChannel.setStatus(ResponseStatus.Ok); + finalCallback.onCompletion(readableStreamChannel, null); + } catch (IOException | RestServiceException e) { + finalCallback.onCompletion(null, e); + } + }); + }, restRequest.getUri(), LOGGER, finalCallback); + } + + /** + * Deserialize the request body into an S3BatchDeleteObjects. + */ + public S3MessagePayload.S3BatchDeleteObjects deserializeRequest(RetainingAsyncWritableChannel channel) + throws RestServiceException { + try { + ByteBuf byteBuffer = channel.consumeContentAsByteBuf(); + byte[] byteArray = new byte[byteBuffer.readableBytes()]; + byteBuffer.readBytes(byteArray); + return new XmlMapper().readValue(byteArray, S3MessagePayload.S3BatchDeleteObjects.class); + } catch (Exception e) { + throw new RestServiceException("failed to deserialize", e, RestServiceErrorCode.BadRequest); + } + } +} diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3Constants.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3Constants.java index 47b52963ea..e0ad3f55b9 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3Constants.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3Constants.java @@ -15,6 +15,7 @@ package com.github.ambry.frontend.s3; public class S3Constants { + public static final int MAX_BATCH_DELETE_SIZE = 1000; public static final int MIN_PART_NUM = 1; public static final int MAX_PART_NUM = 10000; public static final int MAX_LIST_SIZE = MAX_PART_NUM; // since parts are contiguous, the list size cannot exceed the max part number diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3PostHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3PostHandler.java index 2c894c821a..330a360e1d 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3PostHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/s3/S3PostHandler.java @@ -27,12 +27,14 @@ */ public class S3PostHandler extends S3BaseHandler { private final S3MultipartUploadHandler multipartPostHandler; + private final S3BatchDeleteHandler s3BatchDeleteHandler; /** * Constructs a handler for uploading s3 POST requests. */ - public S3PostHandler(S3MultipartUploadHandler multipartPostHandler) { + public S3PostHandler(S3MultipartUploadHandler multipartPostHandler, S3BatchDeleteHandler s3BatchDeleteHandler) { this.multipartPostHandler = multipartPostHandler; + this.s3BatchDeleteHandler = s3BatchDeleteHandler; } /** @@ -48,6 +50,8 @@ protected void doHandle(RestRequest restRequest, RestResponseChannel restRespons if (S3MultipartUploadHandler.isMultipartCreateUploadRequest(restRequest) || S3MultipartUploadHandler.isMultipartCompleteUploadRequest(restRequest)) { multipartPostHandler.handle(restRequest, restResponseChannel, callback); + } else if (isBatchDelete(restRequest)) { + s3BatchDeleteHandler.handle(restRequest, restResponseChannel, callback); } else { // Placeholder for handling any non-multipart S3 POST requests in the future. throw new RestServiceException("Unsupported S3 POST request", RestServiceErrorCode.BadRequest); diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/S3BatchDeleteHandlerTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3BatchDeleteHandlerTest.java new file mode 100644 index 0000000000..1de9138e11 --- /dev/null +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3BatchDeleteHandlerTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2025 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + */ +package com.github.ambry.frontend; +import java.nio.charset.StandardCharsets; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.github.ambry.account.Account; +import com.github.ambry.account.Container; +import com.github.ambry.account.ContainerBuilder; +import com.github.ambry.account.InMemAccountService; +import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.commons.ByteBufferReadableStreamChannel; +import com.github.ambry.commons.CommonTestUtils; +import com.github.ambry.config.FrontendConfig; +import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.frontend.s3.S3BatchDeleteHandler; +import com.github.ambry.frontend.s3.S3MessagePayload; +import com.github.ambry.named.NamedBlobDb; +import com.github.ambry.named.NamedBlobDbFactory; +import com.github.ambry.quota.QuotaTestUtils; +import com.github.ambry.rest.MockRestResponseChannel; +import com.github.ambry.rest.RequestPath; +import com.github.ambry.rest.ResponseStatus; +import com.github.ambry.rest.RestMethod; +import com.github.ambry.rest.RestRequest; +import com.github.ambry.rest.RestResponseChannel; +import com.github.ambry.rest.RestUtils; +import com.github.ambry.router.FutureResult; +import com.github.ambry.router.InMemoryRouter; +import com.github.ambry.router.ReadableStreamChannel; +import com.github.ambry.utils.TestUtils; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.Properties; +import org.json.JSONObject; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class S3BatchDeleteHandlerTest { + private static final InMemAccountService ACCOUNT_SERVICE = new InMemAccountService(false, true); + private static final String CONTENT_TYPE = "text/plain"; + private static final String CLUSTER_NAME = "ambry-test"; + private static final String KEY_NAME = "key-success"; + private static final String KEY_NAME_2 = "key-name-2"; + private static final String KEY_NAME_3 = "key-success-2"; + private final Account account; + private final Container container; + private FrontendConfig frontendConfig; + private NamedBlobPutHandler namedBlobPutHandler; + private S3BatchDeleteHandler s3BatchDeleteHandler; + + public S3BatchDeleteHandlerTest() throws Exception { + account = ACCOUNT_SERVICE.createAndAddRandomAccount(); + container = new ContainerBuilder().setName("container-a") + .setId((short) 10) + .setParentAccountId(account.getId()) + .setStatus(Container.ContainerStatus.ACTIVE) + .setNamedBlobMode(Container.NamedBlobMode.OPTIONAL) + .build(); + account.updateContainerMap(Collections.singletonList(container)); + setup(); + performPutOperation(KEY_NAME, CONTENT_TYPE, container, account); + performPutOperation(KEY_NAME_2, CONTENT_TYPE, container, account); + performPutOperation(KEY_NAME_3, KEY_NAME, container, account); + } + + @Test + public void deleteObjectTest() throws Exception { + String uri = String.format("/s3/%s/%s", account.getName(), container.getName()); + // tests one correct delete and one error + String xmlBody = "" + + "" + + "" + + "key-success" + + "" + + "" + + "key-error" + + "" + + "" + + "key-error2" + + "" + + "" + + "key-success-2" + + "" + + ""; + + byte[] xmlBytes = xmlBody.getBytes("UTF-8"); + RestRequest request = + FrontendRestRequestServiceTest.createRestRequest(RestMethod.POST, uri, new JSONObject(), new LinkedList<>(Arrays.asList(ByteBuffer.wrap(xmlBytes), null))); + RestResponseChannel restResponseChannel = new MockRestResponseChannel(); + request.setArg(RestUtils.InternalKeys.REQUEST_PATH, + RequestPath.parse(request, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + FutureResult futureResult = new FutureResult<>(); + s3BatchDeleteHandler.handle(request, restResponseChannel, futureResult::done); + ReadableStreamChannel readableStreamChannel = futureResult.get(); + ByteBuffer byteBuffer = ((ByteBufferReadableStreamChannel) readableStreamChannel).getContent(); + // verify correct XML via result value + String result = new String(byteBuffer.array(), StandardCharsets.UTF_8); + XmlMapper xmlMapper = new XmlMapper(); + S3MessagePayload.DeleteResult response = + xmlMapper.readValue(byteBuffer.array(), S3MessagePayload.DeleteResult.class); + assertEquals(response.getDeleted().get(0).getKey(), KEY_NAME); + assertEquals(response.getErrors().get(0).toString(), new S3MessagePayload.S3ErrorObject("key-error","InternalServerError").toString()); + assertEquals("Mismatch on status", ResponseStatus.Ok, restResponseChannel.getStatus()); + } + + private void setup() throws Exception { + Properties properties = new Properties(); + CommonTestUtils.populateRequiredRouterProps(properties); + VerifiableProperties verifiableProperties = new VerifiableProperties(properties); + frontendConfig = new FrontendConfig(verifiableProperties); + FrontendMetrics metrics = new FrontendMetrics(new MetricRegistry(), frontendConfig); + AccountAndContainerInjector injector = new AccountAndContainerInjector(ACCOUNT_SERVICE, metrics, frontendConfig); + IdSigningService idSigningService = new AmbryIdSigningService(); + AmbrySecurityServiceFactory securityServiceFactory = + new AmbrySecurityServiceFactory(verifiableProperties, new MockClusterMap(), ACCOUNT_SERVICE, null, + idSigningService, injector, QuotaTestUtils.createDummyQuotaManager()); + SecurityService securityService = securityServiceFactory.getSecurityService(); + NamedBlobDbFactory namedBlobDbFactory = + new TestNamedBlobDbFactory(verifiableProperties, new MetricRegistry(), ACCOUNT_SERVICE); + NamedBlobDb namedBlobDb = namedBlobDbFactory.getNamedBlobDb(); + AmbryIdConverterFactory ambryIdConverterFactory = + new AmbryIdConverterFactory(verifiableProperties, new MetricRegistry(), idSigningService, namedBlobDb); + InMemoryRouter router = new InMemoryRouter(verifiableProperties, new MockClusterMap(), ambryIdConverterFactory); + namedBlobPutHandler = + new NamedBlobPutHandler(securityService, namedBlobDb, ambryIdConverterFactory.getIdConverter(), + idSigningService, router, injector, frontendConfig, metrics, CLUSTER_NAME, + QuotaTestUtils.createDummyQuotaManager(), ACCOUNT_SERVICE, null); + DeleteBlobHandler deleteBlobHandler = + new DeleteBlobHandler(router, securityService, ambryIdConverterFactory.getIdConverter(), injector, metrics, + new MockClusterMap(), QuotaTestUtils.createDummyQuotaManager(), ACCOUNT_SERVICE); + s3BatchDeleteHandler = new S3BatchDeleteHandler(deleteBlobHandler, metrics); + } + + private void performPutOperation(String keyName, String contentType, Container container, Account account) throws Exception { + String requestPath = String.format("/named/%s/%s/%s", account.getName(), container.getName(), keyName); + JSONObject headers = new JSONObject(); + FrontendRestRequestServiceTest.setAmbryHeadersForPut(headers, TestUtils.TTL_SECS, container.isCacheable(), + "test-app", contentType, "tester", null, null, null); + + byte[] content = TestUtils.getRandomBytes(1024); + + RestRequest request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.PUT, requestPath, headers, + new LinkedList<>(Arrays.asList(ByteBuffer.wrap(content), null))); + + request.setArg(RestUtils.InternalKeys.REQUEST_PATH, + RequestPath.parse(request, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME)); + + RestResponseChannel restResponseChannel = new MockRestResponseChannel(); + FutureResult putResult = new FutureResult<>(); + namedBlobPutHandler.handle(request, restResponseChannel, putResult::done); + putResult.get(); + } +} + diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/S3MultipartUploadTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3MultipartUploadTest.java index af09b3608a..ae161a681f 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/S3MultipartUploadTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/S3MultipartUploadTest.java @@ -364,7 +364,7 @@ private void setup() throws Exception { s3MultipartUploadHandler = new S3MultipartUploadHandler(securityService, metrics, injector, frontendConfig, namedBlobDb, idConverter, router, quotaManager); - s3PostHandler = new S3PostHandler(s3MultipartUploadHandler); + s3PostHandler = new S3PostHandler(s3MultipartUploadHandler, null); s3PutHandler = new S3PutHandler(namedBlobPutHandler, s3MultipartUploadHandler, metrics); s3DeleteHandler = new S3DeleteHandler(deleteBlobHandler, s3MultipartUploadHandler, metrics); } diff --git a/ambry-rest/src/main/java/com/github/ambry/rest/WrappedRestRequest.java b/ambry-rest/src/main/java/com/github/ambry/rest/WrappedRestRequest.java index 807e3c4d3e..60adfe454e 100644 --- a/ambry-rest/src/main/java/com/github/ambry/rest/WrappedRestRequest.java +++ b/ambry-rest/src/main/java/com/github/ambry/rest/WrappedRestRequest.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import javax.net.ssl.SSLSession; @@ -29,9 +31,13 @@ */ public class WrappedRestRequest implements RestRequest { private final RestRequest restRequest; + private final Map map = new ConcurrentHashMap<>(); + private final RestRequestMetricsTracker restRequestMetricsTracker = new RestRequestMetricsTracker(); + public WrappedRestRequest(RestRequest restRequest) { this.restRequest = restRequest; + map.putAll(restRequest.getArgs()); } @Override @@ -56,16 +62,16 @@ public void setRestMethod(RestMethod restMethod) { @Override public Map getArgs() { - return restRequest.getArgs(); + return map; } @Override public Object setArg(String key, Object value) { - return restRequest.setArg(key, value); + return map.put(key, value); } @Override - public void removeArg(String key) { restRequest.removeArg(key); } + public void removeArg(String key) { map.remove(key); } @Override public SSLSession getSSLSession() { @@ -89,7 +95,7 @@ public void close() throws IOException { @Override public RestRequestMetricsTracker getMetricsTracker() { - return restRequest.getMetricsTracker(); + return restRequestMetricsTracker; } @Override