From 5ae0677ddadcc8dc4ca6594ca82c96942732bd11 Mon Sep 17 00:00:00 2001 From: nsivabalan Date: Thu, 12 May 2016 17:23:15 -0700 Subject: [PATCH] 1. Added support for BlobNotModifiedResponse 2. Fixed Connection leak in CoordinatorBackedRouter (#286) --- .../config/FrontendConfig.java | 9 ++ .../com.github.ambry/rest/ResponseStatus.java | 5 + .../java/com.github.ambry/rest/RestUtils.java | 37 +++++++ .../com.github.ambry/rest/RestUtilsTest.java | 35 ++++++- .../AmbryBlobStorageService.java | 26 ++++- .../AmbrySecurityService.java | 45 +++++++-- .../AmbryBlobStorageServiceTest.java | 30 ++++++ .../AmbrySecurityServiceTest.java | 72 ++++++++++++-- .../FrontendIntegrationTest.java | 96 ++++++++++++++----- .../NettyResponseChannel.java | 3 + .../CoordinatorBackedRouter.java | 8 ++ 11 files changed, 327 insertions(+), 39 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/config/FrontendConfig.java b/ambry-api/src/main/java/com.github.ambry/config/FrontendConfig.java index dc7f60bfd1..9b7b5b242e 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/FrontendConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/FrontendConfig.java @@ -50,6 +50,13 @@ public class FrontendConfig { @Default("") public final List frontendPathPrefixesToRemove; + /** + * Specifies the blob size in bytes beyond which chunked response will be sent for a getBlob() call + */ + @Config("frontend.chunked.get.response.threshold.in.bytes") + @Default("8192") + public final Integer frontendChunkedGetResponseThresholdInBytes; + public FrontendConfig(VerifiableProperties verifiableProperties) { frontendCacheValiditySeconds = verifiableProperties.getLong("frontend.cache.validity.seconds", 365 * 24 * 60 * 60); frontendIdConverterFactory = verifiableProperties @@ -58,5 +65,7 @@ public FrontendConfig(VerifiableProperties verifiableProperties) { .getString("frontend.security.service.factory", "com.github.ambry.frontend.AmbrySecurityServiceFactory"); frontendPathPrefixesToRemove = Arrays.asList(verifiableProperties.getString("frontend.path.prefixes.to.remove", "").split(",")); + frontendChunkedGetResponseThresholdInBytes = + verifiableProperties.getInt("frontend.chunked.get.response.threshold.in.bytes", 8192); } } diff --git a/ambry-api/src/main/java/com.github.ambry/rest/ResponseStatus.java b/ambry-api/src/main/java/com.github.ambry/rest/ResponseStatus.java index d4f5b711af..2d3632e205 100644 --- a/ambry-api/src/main/java/com.github.ambry/rest/ResponseStatus.java +++ b/ambry-api/src/main/java/com.github.ambry/rest/ResponseStatus.java @@ -31,6 +31,11 @@ public enum ResponseStatus { */ Accepted, + // 3xx + /** + * 304 Not Modified + */ + NotModified, // 4xx /** * 400 - Request was not correct. diff --git a/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java b/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java index ee50a118c1..313a701718 100644 --- a/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java @@ -18,8 +18,11 @@ import com.github.ambry.utils.Utils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +111,11 @@ public static final class Headers { * Header to contain the Cookies */ public final static String COOKIE = "Cookie"; + /** + * Header to be set by the clients during a Get blob call to denote, that blob should be served only if the blob + * has been modified after the value set for this header. + */ + public static final String IF_MODIFIED_SINCE = "If-Modified-Since"; } /** @@ -131,6 +139,7 @@ public static final class MultipartPost { private static final int Crc_Size = 8; private static final short UserMetadata_Version_V1 = 1; + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; private static Logger logger = LoggerFactory.getLogger(RestUtils.class); @@ -416,4 +425,32 @@ public static RestUtils.SubResource getBlobSubResource(RestRequest restRequest) } return subResource; } + + /** + * Fetch time in ms for the {@code dateString} passed in, since epoch + * @param dateString the String representation of the date that needs to be parsed + * @return Time in ms since epoch. Note http time is kept in Seconds so last three digits will be 000. + * Returns null if the {@code dateString} is not in the expected format or could not be parsed + */ + public static Long getTimeFromDateString(String dateString) { + try { + SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); + return dateFormatter.parse(dateString).getTime(); + } catch (ParseException e) { + logger.warn("Could not parse milliseconds from an HTTP date header (" + dateString + ")."); + return null; + } + } + + /** + * Reduces the precision of a time in milliseconds to seconds precision. Result returned is in milliseconds with last + * three digits 000. Useful for comparing times kept in milliseconds that get converted to seconds and back (as is + * done with HTTP date format). + * + * @param ms time that needs to be parsed + * @return milliseconds with seconds precision (last three digits 000). + */ + public static long toSecondsPrecisionInMs(long ms) { + return ms -= ms % 1000; + } } diff --git a/ambry-api/src/test/java/com.github.ambry/rest/RestUtilsTest.java b/ambry-api/src/test/java/com.github.ambry/rest/RestUtilsTest.java index 6744266f12..ed694acdbe 100644 --- a/ambry-api/src/test/java/com.github.ambry/rest/RestUtilsTest.java +++ b/ambry-api/src/test/java/com.github.ambry/rest/RestUtilsTest.java @@ -20,11 +20,15 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.TimeZone; import org.json.JSONException; import org.json.JSONObject; import org.junit.Test; @@ -608,7 +612,7 @@ private void verifyUserMetadataConstructionSuccess(JSONObject headers, Map userMetadata = RestUtils.buildUserMetadata(routerResult.getUserMetadata()); if (userMetadata == null) { @@ -636,7 +644,7 @@ public void onCompletion(Void securityResult, Exception securityException) { securityException = e; } finally { securityCallbackTracker.markCallbackProcessingEnd(); - if (response != null || securityException != null) { + if (response != null || securityException != null || blobNotModified) { submitResponse(restRequest, restResponseChannel, response, securityException); } } @@ -654,6 +662,18 @@ public void onCompletion(Void securityResult, Exception securityException) { } } + /** + * Fetches the {@link RestUtils.Headers#IF_MODIFIED_SINCE} value in epoch time if present + * @param restRequest the {@link RestRequest} that needs to be parsed + * @return the {@link RestUtils.Headers#IF_MODIFIED_SINCE} value in epoch time if present + */ + private Long getIfModifiedSinceMs(RestRequest restRequest) { + if (restRequest.getArgs().get(RestUtils.Headers.IF_MODIFIED_SINCE) != null) { + return RestUtils.getTimeFromDateString((String) restRequest.getArgs().get(RestUtils.Headers.IF_MODIFIED_SINCE)); + } + return null; + } + /** * Sets the blob ID that should be used for {@link Router#getBlob(String, Callback)}. * @param blobId the blob ID that should be used for {@link Router#getBlob(String, Callback)}. diff --git a/ambry-frontend/src/main/java/com.github.ambry.frontend/AmbrySecurityService.java b/ambry-frontend/src/main/java/com.github.ambry.frontend/AmbrySecurityService.java index b359428e07..6ce647d548 100644 --- a/ambry-frontend/src/main/java/com.github.ambry.frontend/AmbrySecurityService.java +++ b/ambry-frontend/src/main/java/com.github.ambry.frontend/AmbrySecurityService.java @@ -40,11 +40,11 @@ class AmbrySecurityService implements SecurityService { private boolean isOpen; - private final long cacheValidityInSecs; + private final FrontendConfig frontendConfig; private final FrontendMetrics frontendMetrics; public AmbrySecurityService(FrontendConfig frontendConfig, FrontendMetrics frontendMetrics) { - cacheValidityInSecs = frontendConfig.frontendCacheValiditySeconds; + this.frontendConfig = frontendConfig; this.frontendMetrics = frontendMetrics; isOpen = true; } @@ -86,16 +86,31 @@ public Future processResponse(RestRequest restRequest, RestResponseChannel try { responseChannel.setStatus(ResponseStatus.Ok); responseChannel.setHeader(RestUtils.Headers.DATE, new GregorianCalendar().getTime()); - responseChannel - .setHeader(RestUtils.Headers.LAST_MODIFIED, new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); if (restRequest.getRestMethod() == RestMethod.HEAD) { + responseChannel + .setHeader(RestUtils.Headers.LAST_MODIFIED, new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); setHeadResponseHeaders(blobInfo, responseChannel); } else if (restRequest.getRestMethod() == RestMethod.GET) { RestUtils.SubResource subResource = RestUtils.getBlobSubResource(restRequest); if (subResource == null) { - setGetBlobResponseHeaders(responseChannel, blobInfo); + Long ifModifiedSinceMs = getIfModifiedSinceMs(restRequest); + if (ifModifiedSinceMs != null + && RestUtils.toSecondsPrecisionInMs(blobInfo.getBlobProperties().getCreationTimeInMs()) + <= ifModifiedSinceMs) { + responseChannel.setStatus(ResponseStatus.NotModified); + responseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0); + } else { + responseChannel.setHeader(RestUtils.Headers.LAST_MODIFIED, + new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); + setGetBlobResponseHeaders(responseChannel, blobInfo); + } } else if (subResource == RestUtils.SubResource.BlobInfo) { + responseChannel.setHeader(RestUtils.Headers.LAST_MODIFIED, + new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); setBlobPropertiesHeaders(blobInfo.getBlobProperties(), responseChannel); + } else if (subResource == RestUtils.SubResource.UserMetadata) { + responseChannel.setHeader(RestUtils.Headers.LAST_MODIFIED, + new Date(blobInfo.getBlobProperties().getCreationTimeInMs())); } } } catch (RestServiceException e) { @@ -115,6 +130,18 @@ public void close() { isOpen = false; } + /** + * Fetches the {@link RestUtils.Headers#IF_MODIFIED_SINCE} value in epoch time if present + * @param restRequest the {@link RestRequest} that needs to be parsed + * @return the {@link RestUtils.Headers#IF_MODIFIED_SINCE} value in epoch time if present + */ + private Long getIfModifiedSinceMs(RestRequest restRequest) { + if (restRequest.getArgs().get(RestUtils.Headers.IF_MODIFIED_SINCE) != null) { + return RestUtils.getTimeFromDateString((String) restRequest.getArgs().get(RestUtils.Headers.IF_MODIFIED_SINCE)); + } + return null; + } + /** * Sets the required headers in the HEAD response. * @param blobInfo the {@link BlobInfo} to refer to while setting headers. @@ -140,6 +167,9 @@ private void setGetBlobResponseHeaders(RestResponseChannel restResponseChannel, throws RestServiceException { BlobProperties blobProperties = blobInfo.getBlobProperties(); restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, blobProperties.getBlobSize()); + if (blobProperties.getBlobSize() < frontendConfig.frontendChunkedGetResponseThresholdInBytes) { + restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, blobProperties.getBlobSize()); + } if (blobProperties.getContentType() != null) { restResponseChannel.setHeader(RestUtils.Headers.CONTENT_TYPE, blobProperties.getContentType()); // Ensure browsers do not execute html with embedded exploits. @@ -153,8 +183,9 @@ private void setGetBlobResponseHeaders(RestResponseChannel restResponseChannel, restResponseChannel.setHeader(RestUtils.Headers.PRAGMA, "no-cache"); } else { restResponseChannel.setHeader(RestUtils.Headers.EXPIRES, - new Date(System.currentTimeMillis() + cacheValidityInSecs * Time.MsPerSec)); - restResponseChannel.setHeader(RestUtils.Headers.CACHE_CONTROL, "max-age=" + cacheValidityInSecs); + new Date(System.currentTimeMillis() + frontendConfig.frontendCacheValiditySeconds * Time.MsPerSec)); + restResponseChannel + .setHeader(RestUtils.Headers.CACHE_CONTROL, "max-age=" + frontendConfig.frontendCacheValiditySeconds); } } diff --git a/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbryBlobStorageServiceTest.java b/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbryBlobStorageServiceTest.java index 194694dbb1..90fa8bcacf 100644 --- a/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbryBlobStorageServiceTest.java +++ b/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbryBlobStorageServiceTest.java @@ -52,13 +52,17 @@ import java.lang.reflect.Method; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -340,6 +344,7 @@ public void postGetHeadDeleteTest() RestUtilsTest.setUserMetadataHeaders(headers, userMetadata); String blobId = postBlobAndVerify(headers, content); getBlobAndVerify(blobId, headers, content); + getNotModifiedBlobAndVerify(blobId); getUserMetadataAndVerify(blobId, headers); getBlobInfoAndVerify(blobId, headers); getHeadAndVerify(blobId, headers); @@ -697,6 +702,31 @@ public void getBlobAndVerify(String blobId, JSONObject expectedHeaders, ByteBuff restResponseChannel.getResponseBody()); } + /** + * Gets the blob with blob ID {@code blobId} and verifies that the blob is not returned as blob is not modified + * @param blobId the blob ID of the blob to GET. + * @throws Exception + */ + public void getNotModifiedBlobAndVerify(String blobId) + throws Exception { + JSONObject headers = new JSONObject(); + SimpleDateFormat dateFormat = new SimpleDateFormat(RestUtils.HTTP_DATE_FORMAT, Locale.ENGLISH); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + Date date = new Date(System.currentTimeMillis()); + String dateStr = dateFormat.format(date); + headers.put(RestUtils.Headers.IF_MODIFIED_SINCE, dateStr); + RestRequest restRequest = createRestRequest(RestMethod.GET, blobId, headers, null); + MockRestResponseChannel restResponseChannel = new MockRestResponseChannel(); + doOperation(restRequest, restResponseChannel); + assertEquals("Unexpected response status", ResponseStatus.NotModified, restResponseChannel.getResponseStatus()); + assertTrue("No Date header", restResponseChannel.getHeader(RestUtils.Headers.DATE) != null); + assertNull("No Last-Modified header expected", restResponseChannel.getHeader("Last-Modified")); + assertNull(RestUtils.Headers.BLOB_SIZE + " should have been null ", + restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE)); + assertNull("Content-Type should have been null", restResponseChannel.getHeader(RestUtils.Headers.CONTENT_TYPE)); + assertEquals("No content expected as blob is not modified", 0, restResponseChannel.getResponseBody().length); + } + /** * Gets the user metadata of the blob with blob ID {@code blobId} and verifies them against what is expected. * @param blobId the blob ID of the blob to HEAD. diff --git a/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbrySecurityServiceTest.java b/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbrySecurityServiceTest.java index 4b5d3910db..41ca7ce762 100644 --- a/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbrySecurityServiceTest.java +++ b/ambry-frontend/src/test/java/com.github.ambry.frontend/AmbrySecurityServiceTest.java @@ -34,8 +34,11 @@ import java.io.UnsupportedEncodingException; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Locale; import java.util.Properties; +import java.util.TimeZone; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,8 +54,7 @@ public class AmbrySecurityServiceTest { private SecurityService securityService; - private final long cacheValidityInSecs = - new FrontendConfig(new VerifiableProperties(new Properties())).frontendCacheValiditySeconds; + private final static FrontendConfig FRONTEND_CONFIG = new FrontendConfig(new VerifiableProperties(new Properties())); public AmbrySecurityServiceTest() throws InstantiationException { @@ -169,7 +171,9 @@ public void testProcessResponse() } // Head for a public blob with a diff TTL - blobInfo = new BlobInfo(getBlobProperties(100, false, 10000, "image/gif"), null); + blobInfo = new BlobInfo( + getBlobProperties(FRONTEND_CONFIG.frontendChunkedGetResponseThresholdInBytes - 1, false, 10000, "image/gif"), + null); callback.reset(); restResponseChannel = new MockRestResponseChannel(); restRequest = createRestRequest(RestMethod.HEAD, "/", null); @@ -192,6 +196,7 @@ public void testProcessResponse() // Get Blob testGetBlob(callback, blobInfo); + testGetNotModifiedBlob(callback, blobInfo); // Get blob for a private blob blobInfo = new BlobInfo(getBlobProperties(100, false, Utils.Infinite_Time, "image/gif"), null); testGetBlob(callback, blobInfo); @@ -205,6 +210,12 @@ public void testProcessResponse() testExceptionCasesProcessResponse(RestMethod.HEAD, callback, blobInfo); testExceptionCasesProcessResponse(RestMethod.GET, callback, blobInfo); + // test Get of a large blob + blobInfo = new BlobInfo( + getBlobProperties(FRONTEND_CONFIG.frontendChunkedGetResponseThresholdInBytes * 2, false, 10000, "image/gif"), + null); + testGetBlob(callback, blobInfo); + // security service closed callback = new SecurityServiceCallback(); securityService.close(); @@ -268,6 +279,31 @@ private void testGetBlob(SecurityServiceCallback callback, BlobInfo blobInfo) verifyHeadersForGetBlob(blobInfo.getBlobProperties(), restResponseChannel); } + /** + * Tests {@link SecurityService#processResponse(RestRequest, RestResponseChannel, BlobInfo, Callback)} for a Get blob + * with the passed in {@link BlobInfo} for a not modified response + * @param callback {@link Callback} to be used with the security service + * @param blobInfo the {@link BlobInfo} to be used for the {@link RestRequest} + * @throws Exception + */ + private void testGetNotModifiedBlob(SecurityServiceCallback callback, BlobInfo blobInfo) + throws Exception { + callback.reset(); + MockRestResponseChannel restResponseChannel = new MockRestResponseChannel(); + JSONObject headers = new JSONObject(); + SimpleDateFormat dateFormat = new SimpleDateFormat(RestUtils.HTTP_DATE_FORMAT, Locale.ENGLISH); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + Date date = new Date(System.currentTimeMillis()); + String dateStr = dateFormat.format(date); + headers.put(RestUtils.Headers.IF_MODIFIED_SINCE, dateStr); + RestRequest restRequest = createRestRequest(RestMethod.GET, "/abc", headers); + securityService.processResponse(restRequest, restResponseChannel, blobInfo, callback).get(); + Assert.assertTrue("Call back should have been invoked", callback.callbackInvoked.get()); + Assert.assertNull("Exception should not have been thrown", callback.exception); + Assert.assertEquals("No body is expected in the response", 0, restResponseChannel.getResponseBody().length); + verifyHeadersForGetBlobNotModified(restResponseChannel); + } + /** * Tests exception cases for {@link SecurityService#processResponse(RestRequest, RestResponseChannel, BlobInfo, Callback)} * with a {@link BadRestResponseChannel} @@ -354,8 +390,13 @@ private void verifyHeadersForGetBlob(BlobProperties blobProperties, MockRestResp Assert.assertNull("TTL value should not be set", restResponseChannel.getHeader(RestUtils.Headers.TTL)); Assert.assertNull("ServiceID value should not be set", restResponseChannel.getHeader(RestUtils.Headers.SERVICE_ID)); Assert.assertNull("OwnerId value should not be set", restResponseChannel.getHeader(RestUtils.Headers.OWNER_ID)); - Assert.assertNull("Content length value should not be set", - restResponseChannel.getHeader(RestUtils.Headers.CONTENT_LENGTH)); + if (blobProperties.getBlobSize() < FRONTEND_CONFIG.frontendChunkedGetResponseThresholdInBytes) { + Assert.assertEquals("Content length value mismatch", blobProperties.getBlobSize(), + Integer.parseInt(restResponseChannel.getHeader(RestUtils.Headers.CONTENT_LENGTH))); + } else { + Assert.assertNull("Content length value should not be set", + restResponseChannel.getHeader(RestUtils.Headers.CONTENT_LENGTH)); + } Assert.assertNull("Ambry Content Type should not be set", restResponseChannel.getHeader(RestUtils.Headers.AMBRY_CONTENT_TYPE)); Assert.assertNull("CreationTime should not be set", restResponseChannel.getHeader(RestUtils.Headers.CREATION_TIME)); @@ -368,13 +409,32 @@ private void verifyHeadersForGetBlob(BlobProperties blobProperties, MockRestResp } else { Assert.assertNotSame("Expires value not as expected", new Date(0), restResponseChannel.getHeader(RestUtils.Headers.EXPIRES)); - Assert.assertEquals("Cache-Control value not as expected", "max-age=" + cacheValidityInSecs, + Assert.assertEquals("Cache-Control value not as expected", + "max-age=" + FRONTEND_CONFIG.frontendCacheValiditySeconds, restResponseChannel.getHeader(RestUtils.Headers.CACHE_CONTROL)); Assert .assertNull("Pragma value should not have been set", restResponseChannel.getHeader(RestUtils.Headers.PRAGMA)); } } + /** + * Verify the headers from the response for a Not modified blob are as expected + * @param restResponseChannel {@link MockRestResponseChannel} from which headers are to be verified + * @throws RestServiceException if there was any problem getting the headers. + */ + private void verifyHeadersForGetBlobNotModified(MockRestResponseChannel restResponseChannel) + throws RestServiceException { + Assert.assertNull("Blob Size should have been null", restResponseChannel.getHeader(RestUtils.Headers.BLOB_SIZE)); + Assert.assertNull("Content Type should have been null", + restResponseChannel.getHeader(RestUtils.Headers.CONTENT_TYPE)); + Assert.assertNull("Expires should have been null", restResponseChannel.getHeader(RestUtils.Headers.EXPIRES)); + Assert.assertNull("Cache control should have been null", + restResponseChannel.getHeader(RestUtils.Headers.CACHE_CONTROL)); + Assert.assertNull("Pragma should have been null", restResponseChannel.getHeader(RestUtils.Headers.PRAGMA)); + Assert.assertEquals("Content lenght should have been 0", "0", + restResponseChannel.getHeader(RestUtils.Headers.CONTENT_LENGTH)); + } + /** * Verify the headers from the response are as expected * @param blobProperties the {@link BlobProperties} to refer to while getting headers. diff --git a/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java b/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java index 281bf87d44..029a5a08d3 100644 --- a/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java +++ b/ambry-frontend/src/test/java/com.github.ambry.frontend/FrontendIntegrationTest.java @@ -47,12 +47,16 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Queue; import java.util.concurrent.ExecutionException; +import junit.framework.Assert; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -118,31 +122,14 @@ public static void teardown() { /** * Tests blob POST, GET, HEAD and DELETE operations. - * @throws ExecutionException - * @throws InterruptedException + * @throws Exception */ @Test public void postGetHeadDeleteTest() - throws ExecutionException, InterruptedException { - ByteBuffer content = ByteBuffer.wrap(RestTestUtils.getRandomBytes(1024)); - String serviceId = "postGetHeadDeleteServiceID"; - String contentType = "application/octet-stream"; - String ownerId = "postGetHeadDeleteOwnerID"; - HttpHeaders headers = new DefaultHttpHeaders(); - setAmbryHeaders(headers, content.capacity(), 7200, false, serviceId, contentType, ownerId); - headers.set(HttpHeaders.Names.CONTENT_LENGTH, content.capacity()); - headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key1", "value1"); - headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key2", "value2"); - - String blobId = postBlobAndVerify(headers, content); - getBlobAndVerify(blobId, headers, content); - getUserMetadataAndVerify(blobId, headers, null); - getBlobInfoAndVerify(blobId, headers, null); - getHeadAndVerify(blobId, headers); - deleteBlobAndVerify(blobId); - - // check GET, HEAD and DELETE after delete. - verifyOperationsAfterDelete(blobId); + throws Exception { + doPostGetHeadDeleteTest(1024); + doPostGetHeadDeleteTest(8192); + doPostGetHeadDeleteTest(10000); } /** @@ -192,6 +179,35 @@ public void healtCheckRequestTest() // helpers // general + /** + * Utility to test blob POST, GET, HEAD and DELETE operations for a specified size + * @param contentSize the size of the blob to be tested + * @throws Exception + */ + private void doPostGetHeadDeleteTest(int contentSize) + throws Exception { + ByteBuffer content = ByteBuffer.wrap(RestTestUtils.getRandomBytes(contentSize)); + String serviceId = "postGetHeadDeleteServiceID"; + String contentType = "application/octet-stream"; + String ownerId = "postGetHeadDeleteOwnerID"; + HttpHeaders headers = new DefaultHttpHeaders(); + setAmbryHeaders(headers, content.capacity(), 7200, false, serviceId, contentType, ownerId); + headers.set(HttpHeaders.Names.CONTENT_LENGTH, content.capacity()); + headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key1", "value1"); + headers.add(RestUtils.Headers.USER_META_DATA_HEADER_PREFIX + "key2", "value2"); + + String blobId = postBlobAndVerify(headers, content); + getBlobAndVerify(blobId, headers, content); + getNotModifiedBlobAndVerify(blobId); + getUserMetadataAndVerify(blobId, headers, null); + getBlobInfoAndVerify(blobId, headers, null); + getHeadAndVerify(blobId, headers); + deleteBlobAndVerify(blobId); + + // check GET, HEAD and DELETE after delete. + verifyOperationsAfterDelete(blobId); + } + /** * Method to easily create a request. * @param httpMethod the {@link HttpMethod} desired. @@ -237,6 +253,21 @@ private ByteBuffer getContent(HttpResponse response, Queue contents) return buffer; } + /** + * Verifies that no content has been sent as part of the response or readable bytes is equivalent to 0 + * @param contents the content of the response. + */ + private void assertNoContent(Queue contents) { + boolean endMarkerFound = false; + for (HttpObject object : contents) { + assertFalse("There should have been no more data after the end marker was found", endMarkerFound); + HttpContent content = (HttpContent) object; + Assert.assertEquals("No content expected ", 0, content.content().readableBytes()); + endMarkerFound = object instanceof LastHttpContent; + ReferenceCountUtil.release(content); + } + } + /** * Discards all the content in {@code contents}. * @param contents the content to discard. @@ -353,6 +384,27 @@ private void getBlobAndVerify(String blobId, HttpHeaders expectedHeaders, ByteBu assertTrue("Channel should be active", HttpHeaders.isKeepAlive(response)); } + /** + * Gets the blob with blob ID {@code blobId} and verifies that the blob is not returned as blob is not modified + * @param blobId the blob ID of the blob to GET. + * @throws Exception + */ + private void getNotModifiedBlobAndVerify(String blobId) + throws Exception { + HttpHeaders headers = new DefaultHttpHeaders(); + headers.add(RestUtils.Headers.IF_MODIFIED_SINCE, new Date()); + FullHttpRequest httpRequest = buildRequest(HttpMethod.GET, blobId, headers, null); + Queue responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); + HttpResponse response = (HttpResponse) responseParts.poll(); + assertEquals("Unexpected response status", HttpResponseStatus.NOT_MODIFIED, response.getStatus()); + assertTrue("No Date header", response.headers().get(RestUtils.Headers.DATE) != null); + assertNull("No Last-Modified header expected", response.headers().get("Last-Modified")); + assertNull(RestUtils.Headers.BLOB_SIZE + " should have been null ", + response.headers().get(RestUtils.Headers.BLOB_SIZE)); + assertNull("Content-Type should have been null", response.headers().get(RestUtils.Headers.CONTENT_TYPE)); + assertNoContent(responseParts); + } + /** * Gets the user metadata of the blob with blob ID {@code blobId} and verifies them against what is expected. * @param blobId the blob ID of the blob to HEAD. diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/NettyResponseChannel.java b/ambry-rest/src/main/java/com.github.ambry.rest/NettyResponseChannel.java index 901a7e9f17..c7b7b96dfd 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/NettyResponseChannel.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/NettyResponseChannel.java @@ -380,6 +380,9 @@ private HttpResponseStatus getHttpResponseStatus(ResponseStatus responseStatus) case Accepted: status = HttpResponseStatus.ACCEPTED; break; + case NotModified: + status = HttpResponseStatus.NOT_MODIFIED; + break; case BadRequest: nettyMetrics.badRequestCount.inc(); status = HttpResponseStatus.BAD_REQUEST; diff --git a/ambry-router/src/main/java/com.github.ambry.router/CoordinatorBackedRouter.java b/ambry-router/src/main/java/com.github.ambry.router/CoordinatorBackedRouter.java index c5a6d34bd1..e8f7fd5521 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/CoordinatorBackedRouter.java +++ b/ambry-router/src/main/java/com.github.ambry.router/CoordinatorBackedRouter.java @@ -230,6 +230,14 @@ protected void completeOperation(FutureResult futureResult, Callback callback, O } catch (Exception e) { metrics.futureCallbackError.inc(); logger.error("Exception caught during future and callback completion", e); + if (operationResult instanceof ReadableStreamChannel) { + try { + logger.trace("Closing ReadableStreamChannel due to exception"); + ((ReadableStreamChannel) operationResult).close(); + } catch (IOException channelCloseException) { + logger.error("Ignoring IOException during ReadableStreamChannel close"); + } + } } finally { metrics.operationPostProcessingTimeInMs.update(System.currentTimeMillis() - postProcessingStartTime); }