Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/linkedin/ambry into dw-fi…
Browse files Browse the repository at this point in the history
…lecopy-apis
  • Loading branch information
DevenAhluwalia committed Jan 21, 2025
2 parents 4d89d8a + 5df7368 commit 50bef4f
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,17 @@ public void testRenameDatasetVersion() throws Exception {
mySqlAccountStore.getDatasetVersion(testAccount.getId(), testContainer.getId(), testAccount.getName(),
testContainer.getName(), DATASET_NAME_RENAME, sourceVersion);
assertNull("Rename from should be null", datasetVersionRecord.getRenameFrom());

//rename a deleted version should return not found
mySqlAccountStore.deleteDatasetVersion(testAccount.getId(), testContainer.getId(), DATASET_NAME_RENAME, sourceVersion);
targetVersion = "5.5.5.5";
try {
mySqlAccountStore.renameDatasetVersion(testAccount.getId(), testContainer.getId(), testAccount.getName(),
testContainer.getName(), DATASET_NAME_RENAME, renamedSourceVersion, targetVersion);
fail();
} catch (AccountServiceException e) {
assertEquals("Mismatch on error code", AccountServiceErrorCode.NotFound, e.getErrorCode());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ public DatasetDao(MySqlDataAccessor dataAccessor, MySqlAccountServiceConfig mySq
//copy a dataset version from a source version.
//We need to update the modify time so counter based purge policy won't delete it.
copyToNewDatasetVersionSql = String.format(
"INSERT INTO %1$s (%2$s, %3$s, %4$s, %5$s, %6$s, %7$s, %8$s, %9$s, %10$s) SELECT %2$s, %3$s, %4$s, ?, "
+ "%6$s, NOW(3), %8$s, %9$s, ? FROM %1$s WHERE %2$s = ? AND %3$s = ? AND %4$s = ? AND %5$s = ? AND %9$s = ?",
"INSERT INTO %1$s (%2$s, %3$s, %4$s, %5$s, %6$s, %7$s, %8$s, %9$s, %10$s) "
+ "SELECT %2$s, %3$s, %4$s, ?, %6$s, NOW(3), %8$s, %9$s, ? "
+ "FROM %1$s WHERE %2$s = ? AND %3$s = ? AND %4$s = ? AND %5$s = ? AND %9$s = ? "
+ "AND (%8$s IS NULL OR %8$s > NOW(3))",
DATASET_VERSION_TABLE, ACCOUNT_ID, CONTAINER_ID, DATASET_NAME, VERSION, CREATION_TIME, LAST_MODIFIED_TIME,
DELETED_TS, DATASET_VERSION_STATE, RENAME_FROM);
//dataset version has in_progress and ready states.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
*/
package com.github.ambry.frontend.s3;

public class S3Constants {
public static final int MIN_PART_NUM = 1;
public static final int MAX_PART_NUM = 10000;
public static final int MAX_LIST_SIZE = MAX_PART_NUM; // since parts are contiguous, the list size cannot exceed the max part number

// Error Messages
public static final String ERR_INVALID_MULTIPART_UPLOAD = "Invalid multipart upload.";
public static final String ERR_INVALID_PART_NUMBER =
"Invalid part number: %s. " + String.format("Part number must be an integer between %s and %s.", MIN_PART_NUM, MAX_PART_NUM);
public static final String ERR_DUPLICATE_PART_NUMBER = "Duplicate part number found: %s.";
public static final String ERR_DUPLICATE_ETAG = "Duplicate eTag found: %s.";
public static final String ERR_EMPTY_REQUEST_BODY = "Xml request body cannot be empty.";
public static final String ERR_PART_LIST_TOO_LONG = String.format("Parts list size cannot exceed %s.", MAX_LIST_SIZE);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Comparator;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -381,13 +382,13 @@ private boolean isRetriable(Throwable throwable) {
*/
List<ChunkInfo> getChunksToStitch(CompleteMultipartUpload completeMultipartUpload) throws RestServiceException {
// Get parts in order from CompleteMultipartUpload, deserialize each part id to get data chunk ids.
List<Part> parts = validatePartsOrThrow(completeMultipartUpload);
List<ChunkInfo> chunkInfos = new ArrayList<>();
try {
// sort the list in order
List<Part> sortedParts = Arrays.asList(completeMultipartUpload.getPart());
Collections.sort(sortedParts, Comparator.comparingInt(Part::getPartNumber));
Collections.sort(parts, Comparator.comparingInt(Part::getPartNumber));
String reservedMetadataId = null;
for (Part part : sortedParts) {
for (Part part : parts) {
S3MultipartETag eTag = S3MultipartETag.deserialize(part.geteTag());
// TODO [S3]: decide the life cycle of S3.
long expirationTimeInMs = -1;
Expand Down Expand Up @@ -415,4 +416,75 @@ List<ChunkInfo> getChunksToStitch(CompleteMultipartUpload completeMultipartUploa
return chunkInfos;
}
}

/**
* Check the list size and part number before processing request
* 1. Disallow duplicate part numbers
* 2. Disallow duplicate etags
* 3. Check for list size 10000
* 4. Check for part numbers integer 1-10000
* @param request the {@link CompleteMultipartUpload} request
* @return the bad request error
*/
List<Part> validatePartsOrThrow(CompleteMultipartUpload request) throws RestServiceException {
List<Part> parts = getParts(request);
Set<Integer> partNumbers = new HashSet<>();
Set<String> etags = new HashSet<>();
for (Part part : parts) {
int partNumber = getPartNumber(part);
if (partNumber < S3Constants.MIN_PART_NUM || partNumber > S3Constants.MAX_PART_NUM) {
String error = String.format(S3Constants.ERR_INVALID_PART_NUMBER, partNumber);
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
}
if (!partNumbers.add(partNumber)) {
String error = String.format(S3Constants.ERR_DUPLICATE_PART_NUMBER, partNumber);
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
}
String etag = part.geteTag();
if (!etags.add(etag)) {
String error = String.format(S3Constants.ERR_DUPLICATE_ETAG, etag);
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
}
}
return parts;
}

/**
* Get the part number from the part object
* @param part
* @return
* @throws RestServiceException
*/
int getPartNumber(Part part) throws RestServiceException {
try {
return part.getPartNumber();
} catch (NumberFormatException e) {
// cannot use getPartNumber() here as it would cause another exception
String error = String.format(S3Constants.ERR_INVALID_PART_NUMBER, part);
throw new RestServiceException(error, RestServiceErrorCode.BadRequest);
} catch (Throwable e) {
// any other exception is invalid
throw new RestServiceException(S3Constants.ERR_INVALID_MULTIPART_UPLOAD, RestServiceErrorCode.BadRequest);
}
}

/**
* Get the list of parts from the request
* @param request
* @return
* @throws RestServiceException
*/
List<Part> getParts(CompleteMultipartUpload request) throws RestServiceException {
Part[] part = request.getPart();
if (part == null) {
throw new RestServiceException(S3Constants.ERR_EMPTY_REQUEST_BODY, RestServiceErrorCode.BadRequest);
}
// Arrays.asList() can return an empty list, but only if it is called with no arguments.
// Therefore, the list below will always have at least one element as we are passing an argument.
List<Part> parts = Arrays.asList(part);
if (parts.size() > S3Constants.MAX_LIST_SIZE) {
throw new RestServiceException(S3Constants.ERR_PART_LIST_TOO_LONG, RestServiceErrorCode.BadRequest);
}
return parts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import com.github.ambry.account.Account;
import com.github.ambry.account.AccountService;
import com.github.ambry.account.Container;
import com.github.ambry.account.ContainerBuilder;
import com.github.ambry.account.InMemAccountService;
Expand All @@ -29,6 +28,7 @@
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.config.FrontendConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.frontend.s3.S3Constants;
import com.github.ambry.frontend.s3.S3DeleteHandler;
import com.github.ambry.frontend.s3.S3MultipartAbortUploadHandler;
import com.github.ambry.frontend.s3.S3MultipartUploadHandler;
Expand All @@ -49,7 +49,6 @@
import com.github.ambry.router.FutureResult;
import com.github.ambry.router.InMemoryRouter;
import com.github.ambry.router.ReadableStreamChannel;
import com.github.ambry.router.Router;
import com.github.ambry.utils.TestUtils;
import com.github.ambry.utils.Utils;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -245,6 +244,93 @@ public void multiPartUploadTest() throws Exception {
assertEquals("Mismatch on status", ResponseStatus.NoContent, restResponseChannel.getStatus());
}

@Test
public void testDuplicatePartNumbers() throws Exception {
Part part1 = new Part("1", "etag1");
Part part2 = new Part("1", "etag2");
Part[] parts = {part2, part1};
String expectedMessage = String.format(S3Constants.ERR_DUPLICATE_PART_NUMBER, 1);
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

@Test
public void testDuplicateEtags() throws Exception {
Part part1 = new Part("1", "etag1");
Part part2 = new Part("2", "etag1");
Part[] parts = {part2, part1};
String expectedMessage = String.format(S3Constants.ERR_DUPLICATE_ETAG, "etag1");
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

@Test
public void testInvalidPartNumLessThanMin() throws Exception {
Part part1 = new Part("0", "etag1");
Part part2 = new Part("1", "etag2");
Part[] parts = {part2, part1};
String expectedMessage = String.format(S3Constants.ERR_INVALID_PART_NUMBER, 0);
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

@Test
public void testPartNumberInvalidExceedsMax() throws Exception {
int invalidPartNumber = S3Constants.MAX_PART_NUM + 1;
Part part1 = new Part("2", "etag1");
Part part2 = new Part(String.valueOf(invalidPartNumber), "etag2");
Part[] parts = {part2, part1};
String expectedMessage = String.format(S3Constants.ERR_INVALID_PART_NUMBER, invalidPartNumber);
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

@Test
public void testExceedMaxParts() throws Exception {
Part[] parts = new Part[S3Constants.MAX_LIST_SIZE + 1];
for (int i = 1; i <= S3Constants.MAX_LIST_SIZE + 1; i++) {
parts[i - 1] = new Part(String.valueOf(i), "eTag" + i);
}
String expectedMessage = S3Constants.ERR_PART_LIST_TOO_LONG;
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

@Test
public void testEmptyPartList() throws Exception {
Part[] parts = {};
String expectedMessage = S3Constants.ERR_EMPTY_REQUEST_BODY;
testMultipartUploadWithInvalidParts(parts, expectedMessage);
}

private void testMultipartUploadWithInvalidParts(Part[] parts, String expectedErrorMessage) throws Exception {
String accountName = account.getName();
String containerName = container.getName();
String blobName = "MyDirectory/MyKey";
String uploadId = "uploadId";
String uri = S3_PREFIX + SLASH + accountName + SLASH + containerName + SLASH + blobName + "?uploadId=" + uploadId;
JSONObject headers = new JSONObject();

CompleteMultipartUpload completeMultipartUpload = new CompleteMultipartUpload(parts);
XmlMapper xmlMapper = new XmlMapper();
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
xmlMapper.writeValue(byteArrayOutputStream, completeMultipartUpload);
String completeMultipartStr = byteArrayOutputStream.toString();
byte[] content = completeMultipartStr.getBytes(StandardCharsets.UTF_8);
int size = content.length;

headers.put(Headers.CONTENT_TYPE, OCTET_STREAM_CONTENT_TYPE);
headers.put(Headers.CONTENT_LENGTH, size);

RestRequest request = FrontendRestRequestServiceTest.createRestRequest(RestMethod.POST, uri, headers,
new LinkedList<>(Arrays.asList(ByteBuffer.wrap(content), null)));
request.setArg(InternalKeys.REQUEST_PATH,
RequestPath.parse(request, frontendConfig.pathPrefixesToRemove, CLUSTER_NAME));

RestResponseChannel restResponseChannel = new MockRestResponseChannel();
s3PostHandler.handle(request, restResponseChannel, (r, e) -> {
assertNotNull("Expected an exception, but none was thrown.", e);
assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains(expectedErrorMessage));
});
}



/**
* Initiates a {@link S3PutHandler}
*/
Expand Down

0 comments on commit 50bef4f

Please sign in to comment.