From 3f47a08803a8304ef192a22ab77477188bcae22e Mon Sep 17 00:00:00 2001 From: Ze Mao Date: Mon, 10 Feb 2020 15:37:10 -0800 Subject: [PATCH] Add Netty Based HTTP2 implementation for storage server. (#1352) Add AsyncWritableChannel interface for MessageReadSet and Send. Add StorageServerNettyFactory and StorageServerNettyChannelInitializer. Add HTTP2 availability in AmbryServer but it is disabled until HTTP2 port populated in Helix. Add ServerHttp2Test. --- .../java/com.github.ambry/network/Send.java | 2 +- .../rest/RestRequestHandlerFactory.java | 30 ----- .../java/com.github.ambry/rest/RestUtils.java | 1 + .../store/MessageReadSet.java | 12 +- .../CloudMessageReadSet.java | 9 ++ .../MockClusterMap.java | 23 ++-- .../TestSSLUtils.java | 11 ++ .../MessageFormatSend.java | 7 ++ .../BlobStoreHardDeleteTest.java | 7 ++ .../MessageFormatSendTest.java | 7 ++ .../NettyServerRequestResponseChannel.java | 17 ++- .../AdminResponse.java | 21 +++- .../CompositeSend.java | 36 ++++-- .../GetResponse.java | 28 ++++- .../com.github.ambry.protocol/Response.java | 16 ++- .../InMemoryStore.java | 7 ++ .../ReplicationTest.java | 6 +- .../Http2ClientChannelInitializer.java | 8 +- .../Http2ClientStreamInitializer.java | 2 +- .../Http2ResponseHandler.java | 47 +++++++ .../Http2StreamHandler.java | 52 ++++++++ .../StorageServerNettyChannelInitializer.java | 86 +++++++++++++ .../StorageServerNettyFactory.java | 81 ++++++++++++ .../NonBlockingRouter.java | 3 + .../com.github.ambry.router/PutManager.java | 1 + .../RequestRegistrationCallback.java | 1 + .../CloudAndStoreReplicationTest.java | 2 + .../RouterServerPlaintextTest.java | 6 +- .../RouterServerSSLTest.java | 1 + .../ServerHardDeleteTest.java | 2 + .../ServerHttp2Test.java | 84 +++++++++++++ .../ServerPlaintextTest.java | 6 +- .../ServerPlaintextTokenTest.java | 5 +- .../ServerSSLTest.java | 1 + .../ServerSSLTokenTest.java | 1 + .../VcrBackupTest.java | 5 +- .../com.github.ambry.server/AmbryServer.java | 53 ++++++++ .../Http2BlockingChannel.java | 33 +++-- .../Http2ResponseHandler.java | 118 ------------------ .../MockStorageManager.java | 7 ++ .../StoreMessageReadSet.java | 23 ++++ build.gradle | 3 +- config/server.properties | 5 + 43 files changed, 670 insertions(+), 206 deletions(-) delete mode 100644 ambry-api/src/main/java/com.github.ambry/rest/RestRequestHandlerFactory.java rename {ambry-server/src/test/java/com.github.ambry.server => ambry-rest/src/main/java/com.github.ambry.rest}/Http2ClientChannelInitializer.java (88%) rename {ambry-server/src/test/java/com.github.ambry.server => ambry-rest/src/main/java/com.github.ambry.rest}/Http2ClientStreamInitializer.java (97%) create mode 100644 ambry-rest/src/main/java/com.github.ambry.rest/Http2ResponseHandler.java create mode 100644 ambry-rest/src/main/java/com.github.ambry.rest/Http2StreamHandler.java create mode 100644 ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyChannelInitializer.java create mode 100644 ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyFactory.java create mode 100644 ambry-server/src/integration-test/java/com.github.ambry.server/ServerHttp2Test.java delete mode 100644 ambry-server/src/test/java/com.github.ambry.server/Http2ResponseHandler.java diff --git a/ambry-api/src/main/java/com.github.ambry/network/Send.java b/ambry-api/src/main/java/com.github.ambry/network/Send.java index 57e39bb6b1..539be5b1f8 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/Send.java +++ b/ambry-api/src/main/java/com.github.ambry/network/Send.java @@ -35,7 +35,7 @@ public interface Send { /** * Placeholder to support {@link AsyncWritableChannel} */ - default void writeTo(AsyncWritableChannel channel, Callback callback) throws IOException { + default void writeTo(AsyncWritableChannel channel, Callback callback) { return; } diff --git a/ambry-api/src/main/java/com.github.ambry/rest/RestRequestHandlerFactory.java b/ambry-api/src/main/java/com.github.ambry/rest/RestRequestHandlerFactory.java deleted file mode 100644 index e07d56e738..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/rest/RestRequestHandlerFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.rest; - -/** - * RestRequestHandlerFactory is a factory to generate all the supporting cast required to instantiate a - * {@link RestRequestHandler}. - *

- * Usually called with the canonical class name and as such might have to support appropriate (multiple) constructors. - */ -public interface RestRequestHandlerFactory { - - /** - * Returns an instance of the {@link RestRequestHandler} that the factory generates. - * @return an instance of {@link RestRequestHandler} generated by this factory. - * @throws InstantiationException if the {@link RestRequestHandler} instance cannot be created. - */ - public RestRequestHandler getRestRequestHandler() throws InstantiationException; -} diff --git a/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java b/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java index 2d6bf40a28..56cca9f2be 100644 --- a/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com.github.ambry/rest/RestUtils.java @@ -216,6 +216,7 @@ public static final class Headers { * Response header indicating the reason a request is non compliant. */ public final static String NON_COMPLIANCE_WARNING = "x-ambry-non-compliance-warning"; + } public static final class TrackingHeaders { diff --git a/ambry-api/src/main/java/com.github.ambry/store/MessageReadSet.java b/ambry-api/src/main/java/com.github.ambry/store/MessageReadSet.java index 04495f2fcb..abb4f1697d 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/MessageReadSet.java +++ b/ambry-api/src/main/java/com.github.ambry/store/MessageReadSet.java @@ -13,6 +13,8 @@ */ package com.github.ambry.store; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import java.io.IOException; import java.nio.channels.WritableByteChannel; @@ -36,7 +38,15 @@ public interface MessageReadSet { long writeTo(int index, WritableByteChannel channel, long relativeOffset, long maxSize) throws IOException; /** - * Returns the total number of messages in this set + * This method is intend to write prefetched data from {@link MessageReadSet} to {@link AsyncWritableChannel}. Data + * should be ready in memory(no blocking call) before write to {@link AsyncWritableChannel} asynchronously. Callback is + * called when the entire batch of writes succeeds or fails. + * @param channel the channel into which the data needs to be written to + * @param callback The callback when data is fully wrote to the channel. + */ + void writeTo(AsyncWritableChannel channel, Callback callback); + + /** * @return The total number of messages in this set */ int count(); diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudMessageReadSet.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudMessageReadSet.java index 1137b436bc..4ffc63f222 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudMessageReadSet.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/CloudMessageReadSet.java @@ -14,6 +14,8 @@ package com.github.ambry.cloud; import com.github.ambry.commons.BlobId; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.MessageReadSet; import com.github.ambry.store.StoreException; import com.github.ambry.store.StoreKey; @@ -25,6 +27,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** @@ -66,6 +69,12 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + // TODO: read from cloud based store and write to AsyncWritableChannel is needed in the future. + throw new UnsupportedOperationException(); + } + @Override public int count() { return blobReadInfoList.size(); diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java index 09a83cfef7..bcb4f6caca 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java @@ -44,6 +44,7 @@ public class MockClusterMap implements ClusterMap { public static final String SPECIAL_PARTITION_CLASS = "specialPartitionClass"; public static final int PLAIN_TEXT_PORT_START_NUMBER = 62000; public static final int SSL_PORT_START_NUMBER = 63000; + public static final int HTTP2_PORT_START_NUMBER = 64000; protected final boolean enableSSLPorts; protected final Map partitions; @@ -117,6 +118,7 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe String dcName = null; int currentPlainTextPort = PLAIN_TEXT_PORT_START_NUMBER; int currentSSLPort = SSL_PORT_START_NUMBER; + int currentHttp2Port = HTTP2_PORT_START_NUMBER; for (int i = 0; i < numNodes; i++) { if (i % 3 == 0) { dcIndex++; @@ -126,9 +128,11 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe MockDataNodeId dataNodeId; if (enableSSLPorts) { dataNodeId = - createDataNode(getListOfPorts(currentPlainTextPort++, currentSSLPort++), dcName, numMountPointsPerNode); + createDataNode(getListOfPorts(currentPlainTextPort++, currentSSLPort++, currentHttp2Port++), dcName, + numMountPointsPerNode); } else { - dataNodeId = createDataNode(getListOfPorts(currentPlainTextPort++), dcName, numMountPointsPerNode); + dataNodeId = createDataNode(getListOfPorts(currentPlainTextPort++, null, currentHttp2Port++), dcName, + numMountPointsPerNode); } dataNodes.add(dataNodeId); dcToDataNodes.computeIfAbsent(dcName, name -> new ArrayList<>()).add(dataNodeId); @@ -244,16 +248,15 @@ public static MockClusterMap createOneNodeRecoveryClusterMap(MockDataNodeId reco return new MockClusterMap(recoveryNode, vcrNode, dcName); } - protected ArrayList getListOfPorts(int port) { - ArrayList ports = new ArrayList<>(); - ports.add(new Port(port, PortType.PLAINTEXT)); - return ports; - } - - public static ArrayList getListOfPorts(int port, int sslPort) { + public static ArrayList getListOfPorts(int port, Integer sslPort, Integer http2Port) { ArrayList ports = new ArrayList(); ports.add(new Port(port, PortType.PLAINTEXT)); - ports.add(new Port(sslPort, PortType.SSL)); + if (sslPort != null) { + ports.add(new Port(sslPort, PortType.SSL)); + } + if (http2Port != null) { + ports.add(new Port(http2Port, PortType.HTTP2)); + } return ports; } diff --git a/ambry-commons/src/test/java/com.github.ambry.commons/TestSSLUtils.java b/ambry-commons/src/test/java/com.github.ambry.commons/TestSSLUtils.java index 4795ea4e25..b60d612030 100644 --- a/ambry-commons/src/test/java/com.github.ambry.commons/TestSSLUtils.java +++ b/ambry-commons/src/test/java/com.github.ambry.commons/TestSSLUtils.java @@ -180,6 +180,17 @@ public static void addSSLProperties(Properties props, String sslEnabledDatacente addSSLProperties(props, sslEnabledDatacenters, mode, trustStoreFile, certAlias, SSL_CONTEXT_PROVIDER); } + /** + * Setup HTTP2 server related properties. + * @param properties the {@link Properties} instance. + */ + public static void addHttp2Properties(Properties properties) { + properties.setProperty("rest.server.rest.request.service.factory", + "com.github.ambry.server.StorageRestRequestService"); + properties.setProperty("rest.server.nio.server.factory", "com.github.ambry.rest.StorageServerNettyFactory"); + properties.setProperty("ssl.client.authentication", "none"); + } + /** * Generate a cert and add SSL related properties to {@code props} * @param props the {@link Properties} instance. diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatSend.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatSend.java index c413079b34..3ec0cbbe66 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatSend.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/MessageFormatSend.java @@ -14,6 +14,8 @@ package com.github.ambry.messageformat; import com.github.ambry.network.Send; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.MessageReadSet; import com.github.ambry.store.StoreKey; import com.github.ambry.store.StoreKeyFactory; @@ -266,6 +268,11 @@ public long writeTo(WritableByteChannel channel) throws IOException { return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + readSet.writeTo(channel, callback); + } + @Override public boolean isSendComplete() { return totalSizeToWrite == sizeWritten; diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/BlobStoreHardDeleteTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/BlobStoreHardDeleteTest.java index 820219a029..8674134e41 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/BlobStoreHardDeleteTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/BlobStoreHardDeleteTest.java @@ -13,6 +13,8 @@ */ package com.github.ambry.messageformat; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.HardDeleteInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -286,6 +288,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return channel.write(ByteBuffer.wrap(toReturn)); } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + + } + @Override public int count() { return messageList.size(); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java index 469cf70921..af3fa188a1 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java @@ -14,6 +14,8 @@ package com.github.ambry.messageformat; import com.codahale.metrics.MetricRegistry; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.MessageReadSet; import com.github.ambry.store.MockId; import com.github.ambry.store.MockIdFactory; @@ -86,6 +88,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + + } + @Override public int count() { return buffers.size(); diff --git a/ambry-network/src/main/java/com.github.ambry.network/NettyServerRequestResponseChannel.java b/ambry-network/src/main/java/com.github.ambry.network/NettyServerRequestResponseChannel.java index 7ab855f79c..d4f488f36a 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NettyServerRequestResponseChannel.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NettyServerRequestResponseChannel.java @@ -15,6 +15,8 @@ import com.github.ambry.rest.RestResponseChannel; import com.github.ambry.rest.RestUtils; +import com.github.ambry.router.Callback; +import java.io.DataInputStream; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; @@ -32,6 +34,14 @@ public NettyServerRequestResponseChannel(int queueSize) { /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ @Override public void sendRequest(NetworkRequest request) throws InterruptedException { + DataInputStream stream = new DataInputStream(request.getInputStream()); + try { + // The first 8 bytes is size of the request. TCP implementation uses this size to allocate buffer. See {@link BoundedReceive} + // Here we just need to consume it. + stream.readLong(); + } catch (IOException e) { + throw new IllegalStateException("stream read error." + e); + } requestQueue.put(request); } @@ -46,11 +56,8 @@ public void sendResponse(Send payloadToSend, NetworkRequest originalRequest, Ser RestResponseChannel restResponseChannel = ((NettyServerRequest) originalRequest).getRestResponseChannel(); restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, payloadToSend.sizeInBytes()); - try { - payloadToSend.writeTo(restResponseChannel, null); // an extra copy - } catch (IOException e) { - throw new InterruptedException(e.toString()); - } + payloadToSend.writeTo(restResponseChannel, (result, exception) -> { + });// an extra copy } /** diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/AdminResponse.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/AdminResponse.java index e72aa57b72..8df5dacb56 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/AdminResponse.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/AdminResponse.java @@ -13,6 +13,8 @@ */ package com.github.ambry.protocol; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -59,15 +61,30 @@ public static AdminResponse readFrom(DataInputStream stream) throws IOException return new AdminResponse(correlationId, clientId, error); } - @Override - public long writeTo(WritableByteChannel channel) throws IOException { + /** + * A private method shared by {@link AdminResponse#writeTo(WritableByteChannel)} and + * {@link AdminResponse#writeTo(AsyncWritableChannel, Callback)}. + * This method allocate bufferToSend and write headers to it if bufferToSend is null. + */ + private void prepareBufferToSend() { if (bufferToSend == null) { serializeIntoBuffer(); bufferToSend.flip(); } + } + + @Override + public long writeTo(WritableByteChannel channel) throws IOException { + prepareBufferToSend(); return bufferToSend.hasRemaining() ? channel.write(bufferToSend) : 0; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + prepareBufferToSend(); + channel.write(bufferToSend, callback); + } + @Override public boolean isSendComplete() { return bufferToSend != null && bufferToSend.remaining() == 0; diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/CompositeSend.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/CompositeSend.java index c3a874cbd2..19eadedb12 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/CompositeSend.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/CompositeSend.java @@ -14,6 +14,8 @@ package com.github.ambry.protocol; import com.github.ambry.network.Send; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.util.List; @@ -24,14 +26,14 @@ */ public class CompositeSend implements Send { - private final List compositSendList; + private final List compositeSendList; private long totalSizeToWrite; private int currentIndexInProgress; - public CompositeSend(List compositSendList) { - this.compositSendList = compositSendList; + public CompositeSend(List compositeSendList) { + this.compositeSendList = compositeSendList; this.currentIndexInProgress = 0; - for (Send messageFormatSend : compositSendList) { + for (Send messageFormatSend : compositeSendList) { totalSizeToWrite += messageFormatSend.sizeInBytes(); } } @@ -39,18 +41,36 @@ public CompositeSend(List compositSendList) { @Override public long writeTo(WritableByteChannel channel) throws IOException { long written = 0; - if (currentIndexInProgress < compositSendList.size()) { - written = compositSendList.get(currentIndexInProgress).writeTo(channel); - if (compositSendList.get(currentIndexInProgress).isSendComplete()) { + if (currentIndexInProgress < compositeSendList.size()) { + written = compositeSendList.get(currentIndexInProgress).writeTo(channel); + if (compositeSendList.get(currentIndexInProgress).isSendComplete()) { currentIndexInProgress++; } } return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + int lastIndex = compositeSendList.size() - 1; + int i = 0; + // This callback technically won't be set to the correct value since it will only reflect the size of the last send, + // not all sends in the batch. This may not currently be a problem but is something to look out for. + for (Send send : compositeSendList) { + if (i == lastIndex) { + // only the last one pass in callback + send.writeTo(channel, callback); + } else { + //TODO: stop writing to the channel whenever there is an exception here and stop the for loop. + send.writeTo(channel, null); + } + i++; + } + } + @Override public boolean isSendComplete() { - return currentIndexInProgress == compositSendList.size(); + return currentIndexInProgress == compositeSendList.size(); } @Override diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/GetResponse.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/GetResponse.java index 70f9358916..06fd7ff500 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/GetResponse.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/GetResponse.java @@ -14,8 +14,10 @@ package com.github.ambry.protocol; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.network.Send; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; +import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -107,9 +109,12 @@ public static GetResponse readFrom(DataInputStream stream, ClusterMap map) throw } } - @Override - public long writeTo(WritableByteChannel channel) throws IOException { - long written = 0; + /** + * A private method shared by {@link GetResponse#writeTo(WritableByteChannel)} and + * {@link GetResponse#writeTo(AsyncWritableChannel, Callback)}. + * This method allocate bufferToSend and write metadata to it if bufferToSend is null. + */ + private void prepareBufferToSend() { if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate( (int) super.sizeInBytes() + (Partition_Response_Info_List_Size + partitionResponseInfoSize)); @@ -122,6 +127,12 @@ public long writeTo(WritableByteChannel channel) throws IOException { } bufferToSend.flip(); } + } + + @Override + public long writeTo(WritableByteChannel channel) throws IOException { + prepareBufferToSend(); + long written = 0; if (bufferToSend.remaining() > 0) { written = channel.write(bufferToSend); } @@ -131,6 +142,15 @@ public long writeTo(WritableByteChannel channel) throws IOException { return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + prepareBufferToSend(); + channel.write(bufferToSend, callback); + if (toSend != null) { + toSend.writeTo(channel, callback); + } + } + @Override public boolean isSendComplete() { return (super.isSendComplete()) && (toSend == null || toSend.isSendComplete()); diff --git a/ambry-protocol/src/main/java/com.github.ambry.protocol/Response.java b/ambry-protocol/src/main/java/com.github.ambry.protocol/Response.java index 48d50f848f..8d38b93707 100644 --- a/ambry-protocol/src/main/java/com.github.ambry.protocol/Response.java +++ b/ambry-protocol/src/main/java/com.github.ambry.protocol/Response.java @@ -13,6 +13,8 @@ */ package com.github.ambry.protocol; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.server.ServerErrorCode; import java.io.IOException; import java.nio.ByteBuffer; @@ -42,16 +44,26 @@ protected void writeHeader() { bufferToSend.putShort((short) error.ordinal()); } - @Override - public long writeTo(WritableByteChannel channel) throws IOException { + private void prepareBuffer() { if (bufferToSend == null) { bufferToSend = ByteBuffer.allocate((int) sizeInBytes()); writeHeader(); bufferToSend.flip(); } + } + + @Override + public long writeTo(WritableByteChannel channel) throws IOException { + prepareBuffer(); return bufferToSend.remaining() > 0 ? channel.write(bufferToSend) : 0; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + prepareBuffer(); + channel.write(bufferToSend, callback); + } + @Override public boolean isSendComplete() { return (bufferToSend == null || bufferToSend.remaining() == 0); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java index e0f4a7fad0..716347958d 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/InMemoryStore.java @@ -15,6 +15,8 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaState; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -74,6 +76,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return sizeToWrite; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + + } + @Override public int count() { return buffers.size(); diff --git a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java index 62b09da708..e697ffcc56 100644 --- a/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java +++ b/ambry-replication/src/test/java/com.github.ambry.replication/ReplicationTest.java @@ -329,9 +329,9 @@ public void onReplicaAddedOrRemovedCallbackTest() throws Exception { ReplicaId peerReplicaToRemove = existingReplica.getPartitionId().getReplicaIds().stream().filter(r -> r != existingReplica).findFirst().get(); // create a new node and place a peer of existing replica on it. - MockDataNodeId remoteNode = - createDataNode(getListOfPorts(PLAIN_TEXT_PORT_START_NUMBER + 10, SSL_PORT_START_NUMBER + 10), - clusterMap.getDatacenterName((byte) 0), 3); + MockDataNodeId remoteNode = createDataNode( + getListOfPorts(PLAIN_TEXT_PORT_START_NUMBER + 10, SSL_PORT_START_NUMBER + 10, HTTP2_PORT_START_NUMBER + 10), + clusterMap.getDatacenterName((byte) 0), 3); ReplicaId addedReplica = new MockReplicaId(remoteNode.getPort(), (MockPartitionId) existingReplica.getPartitionId(), remoteNode, 0); // populate added replica and removed replica lists diff --git a/ambry-server/src/test/java/com.github.ambry.server/Http2ClientChannelInitializer.java b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientChannelInitializer.java similarity index 88% rename from ambry-server/src/test/java/com.github.ambry.server/Http2ClientChannelInitializer.java rename to ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientChannelInitializer.java index ab905baee4..05edea3ca7 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/Http2ClientChannelInitializer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientChannelInitializer.java @@ -12,16 +12,16 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package com.github.ambry.server; +package com.github.ambry.rest; import com.github.ambry.commons.SSLFactory; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.ssl.SslHandler; @@ -29,7 +29,7 @@ * A {@link ChannelInitializer} to be used with {@link Http2BlockingChannel}. Calling {@link #initChannel(SocketChannel)} * adds the necessary handlers to a channel's pipeline so that it may handle requests. */ -class Http2ClientChannelInitializer extends ChannelInitializer { +public class Http2ClientChannelInitializer extends ChannelInitializer { private final SSLFactory sslFactory; private final String host; private final int port; @@ -53,7 +53,7 @@ protected void initChannel(SocketChannel ch) throws Exception { } SslHandler sslHandler = new SslHandler(sslFactory.createSSLEngine(host, port, SSLFactory.Mode.CLIENT)); pipeline.addLast(sslHandler); - pipeline.addLast(Http2FrameCodecBuilder.forClient().build()); + pipeline.addLast(Http2FrameCodecBuilder.forClient().initialSettings(Http2Settings.defaultSettings()).build()); pipeline.addLast(new Http2MultiplexHandler(new ChannelInboundHandlerAdapter())); } } diff --git a/ambry-server/src/test/java/com.github.ambry.server/Http2ClientStreamInitializer.java b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientStreamInitializer.java similarity index 97% rename from ambry-server/src/test/java/com.github.ambry.server/Http2ClientStreamInitializer.java rename to ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientStreamInitializer.java index acf81b99a0..9dd3de2bc6 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/Http2ClientStreamInitializer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ClientStreamInitializer.java @@ -12,7 +12,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -package com.github.ambry.server; +package com.github.ambry.rest; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/Http2ResponseHandler.java b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ResponseHandler.java new file mode 100644 index 0000000000..17d7f967d7 --- /dev/null +++ b/ambry-rest/src/main/java/com.github.ambry.rest/Http2ResponseHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package com.github.ambry.rest; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http2.HttpConversionUtil; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Process {@link io.netty.handler.codec.http.FullHttpResponse} translated from HTTP/2 frames + */ +@ChannelHandler.Sharable +public class Http2ResponseHandler extends SimpleChannelInboundHandler { + public final static AttributeKey> RESPONSE_PROMISE = AttributeKey.newInstance("ResponsePromise"); + private final static Logger logger = LoggerFactory.getLogger(Http2ResponseHandler.class); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { + Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()); + if (streamId == null) { + logger.error("Http2ResponseHandler unexpected message received: " + msg); + return; + } + logger.trace("Stream response received."); + ctx.channel().attr(RESPONSE_PROMISE).getAndSet(null).setSuccess(msg.content().retainedDuplicate()); + } +} diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/Http2StreamHandler.java b/ambry-rest/src/main/java/com.github.ambry.rest/Http2StreamHandler.java new file mode 100644 index 0000000000..618b48ff9f --- /dev/null +++ b/ambry-rest/src/main/java/com.github.ambry.rest/Http2StreamHandler.java @@ -0,0 +1,52 @@ +/** + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.rest; + +import com.github.ambry.config.NettyConfig; +import com.github.ambry.config.PerformanceConfig; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; +import io.netty.handler.stream.ChunkedWriteHandler; + + +/* + * HTTP2 stream handler for each stream. + */ +@ChannelHandler.Sharable +public class Http2StreamHandler extends ChannelInboundHandlerAdapter { + + private NettyMetrics nettyMetrics; + private NettyConfig nettyConfig; + private PerformanceConfig performanceConfig; + private RestRequestHandler requestHandler; + + public Http2StreamHandler(NettyMetrics nettyMetrics, NettyConfig nettyConfig, PerformanceConfig performanceConfig, + RestRequestHandler requestHandler) { + this.nettyMetrics = nettyMetrics; + this.nettyConfig = nettyConfig; + this.performanceConfig = performanceConfig; + this.requestHandler = requestHandler; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().addLast(new Http2StreamFrameToHttpObjectCodec(true)); + // NettyMessageProcessor depends on ChunkedWriteHandler. + // TODO: add deployment health check handler. + ctx.pipeline().addLast(new ChunkedWriteHandler()); + ctx.pipeline().addLast(new NettyMessageProcessor(nettyMetrics, nettyConfig, performanceConfig, requestHandler)); + } +} diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyChannelInitializer.java b/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyChannelInitializer.java new file mode 100644 index 0000000000..7d0ce06800 --- /dev/null +++ b/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyChannelInitializer.java @@ -0,0 +1,86 @@ +/* + * Copyright 2017 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.github.ambry.rest; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.commons.SSLFactory; +import com.github.ambry.config.NettyConfig; +import com.github.ambry.config.PerformanceConfig; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.ssl.SslHandler; +import java.net.InetSocketAddress; +import java.util.Objects; + + +/** + * A {@link ChannelInitializer} to be used with {@link StorageServerNettyFactory}. Calling {@link #initChannel(SocketChannel)} + * adds the necessary handlers to a channel's pipeline so that it may handle requests. + */ +public class StorageServerNettyChannelInitializer extends ChannelInitializer { + private final NettyConfig nettyConfig; + private final PerformanceConfig performanceConfig; + private final NettyMetrics nettyMetrics; + private final ConnectionStatsHandler connectionStatsHandler; + private final RestRequestHandler requestHandler; + private final SSLFactory sslFactory; + + /** + * Construct a {@link StorageServerNettyChannelInitializer}. + * @param nettyConfig the config to use when instantiating certain handlers on this pipeline. + * @param performanceConfig the config to use when evaluating ambry service level objectives that include latency. + * @param nettyMetrics the {@link NettyMetrics} object to use. + * @param connectionStatsHandler the {@link ConnectionStatsHandler} to use. + * @param requestHandler the {@link RestRequestHandler} to handle requests on this pipeline. + * @param sslFactory the {@link SSLFactory} to use for generating {@link javax.net.ssl.SSLEngine} instances, + * or {@code null} if SSL is not enabled in this pipeline. + */ + public StorageServerNettyChannelInitializer(NettyConfig nettyConfig, PerformanceConfig performanceConfig, + NettyMetrics nettyMetrics, ConnectionStatsHandler connectionStatsHandler, RestRequestHandler requestHandler, + SSLFactory sslFactory, MetricRegistry metricRegistry) { + this.nettyConfig = nettyConfig; + this.performanceConfig = performanceConfig; + this.nettyMetrics = nettyMetrics; + // For http2, SSL encrypted is required. sslFactory should not be null. + Objects.requireNonNull(sslFactory); + this.sslFactory = sslFactory; + this.connectionStatsHandler = connectionStatsHandler; + RestRequestMetricsTracker.setDefaults(metricRegistry); + this.requestHandler = requestHandler; + } + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // If channel handler implementations are not annotated with @Sharable, Netty creates a new instance of every class + // in the pipeline for every connection. + // i.e. if there are a 1000 active connections there will be a 1000 NettyMessageProcessor instances. + ChannelPipeline pipeline = ch.pipeline(); + // connection stats handler to track connection related metrics + pipeline.addLast("connectionStatsHandler", connectionStatsHandler); + InetSocketAddress peerAddress = ch.remoteAddress(); + String peerHost = peerAddress.getHostName(); + int peerPort = peerAddress.getPort(); + SslHandler sslHandler = new SslHandler(sslFactory.createSSLEngine(peerHost, peerPort, SSLFactory.Mode.SERVER)); + pipeline.addLast("SslHandler", sslHandler); + pipeline.addLast(Http2FrameCodecBuilder.forServer().initialSettings(Http2Settings.defaultSettings()).build()) + .addLast("Http2MultiplexHandler", new Http2MultiplexHandler( + new Http2StreamHandler(nettyMetrics, nettyConfig, performanceConfig, requestHandler))); + } +} + diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyFactory.java b/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyFactory.java new file mode 100644 index 0000000000..16544fd0e2 --- /dev/null +++ b/ambry-rest/src/main/java/com.github.ambry.rest/StorageServerNettyFactory.java @@ -0,0 +1,81 @@ +/** + * Copyright 2016 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.rest; + +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.commons.SSLFactory; +import com.github.ambry.config.NettyConfig; +import com.github.ambry.config.PerformanceConfig; +import com.github.ambry.config.VerifiableProperties; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import java.util.Collections; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Netty specific implementation of {@link NioServerFactory}. + *

+ * Sets up all the supporting cast required for the operation of {@link NettyServer} and returns a new instance on + * {@link #getNioServer()}. + */ +public class StorageServerNettyFactory implements NioServerFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(StorageServerNettyFactory.class); + + private final NettyConfig nettyConfig; + private final PerformanceConfig performanceConfig; + private final NettyMetrics nettyMetrics; + final Map> channelInitializers; + + /** + * Creates a new instance of StorageServerNettyFactory. + * @param http2Port the port for HTTP2 request. + * @param verifiableProperties the in-memory {@link VerifiableProperties} to use. + * @param metricRegistry the {@link MetricRegistry} to use. + * @param requestHandler the {@link RestRequestHandler} to hand off the requests to. + * @param restServerState the {@link RestServerState} that can be used to check the health of the system + * to respond to health check requests + * @param sslFactory the {@link SSLFactory} used to construct the {@link javax.net.ssl.SSLEngine} used for + * handling http2 requests. + * @throws IllegalArgumentException if any of the arguments are null. + */ + public StorageServerNettyFactory(int http2Port, VerifiableProperties verifiableProperties, + MetricRegistry metricRegistry, final RestRequestHandler requestHandler, final RestServerState restServerState, + SSLFactory sslFactory) { + if (verifiableProperties == null || metricRegistry == null || requestHandler == null || restServerState == null + || sslFactory == null) { + throw new IllegalArgumentException("Null arg(s) received during instantiation of StorageServerNettyFactory"); + } + nettyConfig = new NettyConfig(verifiableProperties); + performanceConfig = new PerformanceConfig(verifiableProperties); + nettyMetrics = new NettyMetrics(metricRegistry); + ConnectionStatsHandler connectionStatsHandler = new ConnectionStatsHandler(nettyMetrics); + + Map> initializers = Collections.singletonMap(http2Port, + new StorageServerNettyChannelInitializer(nettyConfig, performanceConfig, nettyMetrics, connectionStatsHandler, + requestHandler, sslFactory, metricRegistry)); + channelInitializers = Collections.unmodifiableMap(initializers); + } + + /** + * Returns a new instance of {@link NettyServer}. + * @return a new instance of {@link NettyServer}. + */ + @Override + public NioServer getNioServer() { + return new NettyServer(nettyConfig, nettyMetrics, channelInitializers); + } +} diff --git a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java index 6326e65438..30c6d027b9 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java +++ b/ambry-router/src/main/java/com.github.ambry.router/NonBlockingRouter.java @@ -888,6 +888,9 @@ public void run() { List requestsToSend = new ArrayList<>(); Set requestsToDrop = new HashSet<>(); pollForRequests(requestsToSend, requestsToDrop); + + + List responseInfoList = networkClient.sendAndPoll(requestsToSend, routerConfig.routerDropRequestOnTimeout ? requestsToDrop : Collections.emptySet(), NETWORK_CLIENT_POLL_TIMEOUT); diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java index 9bbfa04ee3..7d2084523a 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java @@ -143,6 +143,7 @@ void submitPutBlobOperation(BlobProperties blobProperties, byte[] userMetaData, PutOperation.forUpload(routerConfig, routerMetrics, clusterMap, notificationSystem, accountService, userMetaData, channel, options, futureResult, callback, routerCallback, chunkArrivalListener, kms, cryptoService, cryptoJobHandler, time, blobProperties, partitionClass); + // TODO: netty send this request putOperations.add(putOperation); putOperation.startOperation(); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/RequestRegistrationCallback.java b/ambry-router/src/main/java/com.github.ambry.router/RequestRegistrationCallback.java index 50ad6fe99d..348dbb2a95 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/RequestRegistrationCallback.java +++ b/ambry-router/src/main/java/com.github.ambry.router/RequestRegistrationCallback.java @@ -72,6 +72,7 @@ void setRequestsToDrop(Set requestsToDrop) { * @param requestInfo the request to send out. */ void registerRequestToSend(T routerOperation, RequestInfo requestInfo) { + // TODO: netty send here>? if (requestsToSend != null) { requestsToSend.add(requestInfo); } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/CloudAndStoreReplicationTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/CloudAndStoreReplicationTest.java index 6c940ea370..74f1bfd1ef 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/CloudAndStoreReplicationTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/CloudAndStoreReplicationTest.java @@ -25,6 +25,7 @@ import com.github.ambry.clustermap.MockDataNodeId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.config.ReplicationConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.BlobProperties; @@ -120,6 +121,7 @@ public void setup() throws Exception { if (!vcrRecoveryPartitionConfig.isEmpty()) { recoveryProperties.setProperty("vcr.recovery.partitions", vcrRecoveryPartitionConfig); } + TestSSLUtils.addHttp2Properties(recoveryProperties); // create vcr node List vcrPortList = Arrays.asList(new Port(12310, PortType.PLAINTEXT), new Port(12410, PortType.SSL)); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java index d64723c971..cef9fbad1b 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java @@ -18,6 +18,7 @@ import com.github.ambry.account.Account; import com.github.ambry.account.Container; import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.server.RouterServerTestFramework.*; import com.github.ambry.utils.SystemTime; import java.io.IOException; @@ -74,7 +75,10 @@ public RouterServerPlaintextTest(boolean testEncryption) { @BeforeClass public static void initializeTests() throws Exception { Properties properties = getRouterProperties("DC1"); - plaintextCluster = new MockCluster(new Properties(), false, SystemTime.getInstance()); + + Properties serverProperties = new Properties(); + TestSSLUtils.addHttp2Properties(serverProperties); + plaintextCluster = new MockCluster(serverProperties, false, SystemTime.getInstance()); MockNotificationSystem notificationSystem = new MockNotificationSystem(plaintextCluster.getClusterMap()); plaintextCluster.initializeServers(notificationSystem); plaintextCluster.startServers(); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java index f318959ede..bc4df60527 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java @@ -74,6 +74,7 @@ public static void initializeTests() throws Exception { Properties serverSSLProps = new Properties(); TestSSLUtils.addSSLProperties(serverSSLProps, sslEnabledDataCentersStr, SSLFactory.Mode.SERVER, trustStoreFile, "server"); + TestSSLUtils.addHttp2Properties(serverSSLProps); Properties routerProps = getRouterProperties("DC1"); TestSSLUtils.addSSLProperties(routerProps, sslEnabledDataCentersStr, SSLFactory.Mode.CLIENT, trustStoreFile, "router-client"); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java index 425bd8cabd..dc0d8023f1 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java @@ -19,6 +19,7 @@ import com.github.ambry.clustermap.PartitionId; import com.github.ambry.commons.BlobId; import com.github.ambry.commons.CommonTestUtils; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.BlobData; import com.github.ambry.messageformat.BlobProperties; @@ -94,6 +95,7 @@ public void initialize() throws Exception { props.setProperty("clustermap.datacenter.name", "DC1"); props.setProperty("clustermap.host.name", "localhost"); props.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); + TestSSLUtils.addHttp2Properties(props); VerifiableProperties propverify = new VerifiableProperties(props); server = new AmbryServer(propverify, mockClusterAgentsFactory, notificationSystem, time); server.startup(); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHttp2Test.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHttp2Test.java new file mode 100644 index 0000000000..4f1115194a --- /dev/null +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHttp2Test.java @@ -0,0 +1,84 @@ +/** + * Copyright 2016 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.server; + +import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.network.Port; +import com.github.ambry.network.PortType; +import com.github.ambry.utils.SystemTime; +import com.github.ambry.utils.TestUtils; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + + +@RunWith(Parameterized.class) +public class ServerHttp2Test { + private static Properties routerProps; + private static MockNotificationSystem notificationSystem; + private static MockCluster http2Cluster; + private final boolean testEncryption; + + @BeforeClass + public static void initializeTests() throws Exception { + + routerProps = new Properties(); + routerProps.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); + routerProps.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); + + Properties properties = new Properties(); + properties.setProperty("rest.server.rest.request.service.factory", + "com.github.ambry.server.StorageRestRequestService"); + properties.setProperty("rest.server.nio.server.factory", "com.github.ambry.rest.StorageServerNettyFactory"); + properties.setProperty("ssl.client.authentication", "none"); + http2Cluster = new MockCluster(properties, false, SystemTime.getInstance(), 1, 1, 2); + notificationSystem = new MockNotificationSystem(http2Cluster.getClusterMap()); + http2Cluster.initializeServers(notificationSystem); + http2Cluster.startServers(); + } + + /** + * Running for both regular and encrypted blobs + * @return an array with both {@code false} and {@code true}. + */ + @Parameterized.Parameters + public static List data() { + return Arrays.asList(new Object[][]{{false}, }); + } + + public ServerHttp2Test(boolean testEncryption) { + this.testEncryption = testEncryption; + } + + @AfterClass + public static void cleanup() throws IOException { + if (http2Cluster != null) { + http2Cluster.cleanup(); + } + } + + @Test + public void endToEndTest() throws Exception { + DataNodeId dataNodeId = http2Cluster.getGeneralDataNode(); + ServerTestUtil.endToEndTest(new Port(dataNodeId.getHttp2Port(), PortType.HTTP2), "DC1", http2Cluster, null, null, + routerProps, testEncryption); + } +} diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTest.java index 0f2ef9b1eb..9728b28e72 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTest.java @@ -15,6 +15,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.SystemTime; @@ -47,7 +48,10 @@ public static void initializeTests() throws Exception { routerProps = new Properties(); routerProps.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); routerProps.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); - plaintextCluster = new MockCluster(new Properties(), false, SystemTime.getInstance()); + + Properties serverProperties = new Properties(); + TestSSLUtils.addHttp2Properties(serverProperties); + plaintextCluster = new MockCluster(serverProperties, false, SystemTime.getInstance()); notificationSystem = new MockNotificationSystem(plaintextCluster.getClusterMap()); plaintextCluster.initializeServers(notificationSystem); plaintextCluster.startServers(); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTokenTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTokenTest.java index 8301118803..c6f9f3f53a 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTokenTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerPlaintextTokenTest.java @@ -15,6 +15,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.MockClusterMap; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; import com.github.ambry.utils.SystemTime; @@ -65,7 +66,9 @@ public void initializeTests() throws Exception { routerProps = new Properties(); routerProps.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); routerProps.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); - plaintextCluster = new MockCluster(new Properties(), false, SystemTime.getInstance()); + Properties serverProperties = new Properties(); + TestSSLUtils.addHttp2Properties(serverProperties); + plaintextCluster = new MockCluster(serverProperties, false, SystemTime.getInstance()); notificationSystem = new MockNotificationSystem(plaintextCluster.getClusterMap()); plaintextCluster.initializeServers(notificationSystem); plaintextCluster.startServers(); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTest.java index 5fa7a944e7..3905b65f83 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTest.java @@ -68,6 +68,7 @@ public static void initializeTests() throws Exception { new SSLConfig(TestSSLUtils.createSslProps("DC1,DC2", SSLFactory.Mode.CLIENT, trustStoreFile, "client3")); serverSSLProps = new Properties(); TestSSLUtils.addSSLProperties(serverSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.SERVER, trustStoreFile, "server"); + TestSSLUtils.addHttp2Properties(serverSSLProps); routerProps = new Properties(); routerProps.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); routerProps.setProperty("clustermap.default.partition.class", MockClusterMap.DEFAULT_PARTITION_CLASS); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTokenTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTokenTest.java index 919d538b6c..dba4fc60e1 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTokenTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerSSLTokenTest.java @@ -72,6 +72,7 @@ public void initializeTests() throws Exception { new SSLConfig(TestSSLUtils.createSslProps("DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile, "client1")); serverSSLProps = new Properties(); TestSSLUtils.addSSLProperties(serverSSLProps, "DC1,DC2,DC3", SSLFactory.Mode.SERVER, trustStoreFile, "server"); + TestSSLUtils.addHttp2Properties(serverSSLProps); routerProps = new Properties(); routerProps.setProperty("kms.default.container.key", TestUtils.getRandomKey(32)); TestSSLUtils.addSSLProperties(routerProps, "DC1,DC2,DC3", SSLFactory.Mode.CLIENT, trustStoreFile, "router-client"); diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java index 52ccccf16f..d506b9f75e 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java @@ -21,6 +21,7 @@ import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.commons.BlobId; +import com.github.ambry.commons.TestSSLUtils; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.network.ConnectedChannel; @@ -66,7 +67,9 @@ public class VcrBackupTest { @Before public void setup() throws Exception { - mockCluster = new MockCluster(new Properties(), false, SystemTime.getInstance(), 1, 1, numOfPartitions); + Properties props = new Properties(); + TestSSLUtils.addHttp2Properties(props); + mockCluster = new MockCluster(props, false, SystemTime.getInstance(), 1, 1, numOfPartitions); notificationSystem = new MockNotificationSystem(mockCluster.getClusterMap()); mockCluster.initializeServers(notificationSystem); mockCluster.startServers(); diff --git a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java index 309f2c9291..e12250c1c0 100644 --- a/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java +++ b/ambry-server/src/main/java/com.github.ambry.server/AmbryServer.java @@ -22,6 +22,7 @@ import com.github.ambry.clustermap.ClusterSpectatorFactory; import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.commons.LoggingNotificationSystem; +import com.github.ambry.commons.SSLFactory; import com.github.ambry.commons.ServerMetrics; import com.github.ambry.config.CloudConfig; import com.github.ambry.config.ClusterMapConfig; @@ -29,6 +30,7 @@ import com.github.ambry.config.DiskManagerConfig; import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.ReplicationConfig; +import com.github.ambry.config.RestServerConfig; import com.github.ambry.config.SSLConfig; import com.github.ambry.config.ServerConfig; import com.github.ambry.config.StatsManagerConfig; @@ -38,6 +40,7 @@ import com.github.ambry.messageformat.BlobStoreRecovery; import com.github.ambry.network.BlockingChannelConnectionPool; import com.github.ambry.network.ConnectionPool; +import com.github.ambry.network.NettyServerRequestResponseChannel; import com.github.ambry.network.NetworkServer; import com.github.ambry.network.Port; import com.github.ambry.network.PortType; @@ -48,6 +51,14 @@ import com.github.ambry.replication.CloudToStoreReplicationManager; import com.github.ambry.replication.FindTokenHelper; import com.github.ambry.replication.ReplicationManager; +import com.github.ambry.rest.NettySslHttp2Factory; +import com.github.ambry.rest.NioServer; +import com.github.ambry.rest.NioServerFactory; +import com.github.ambry.rest.RestRequestHandler; +import com.github.ambry.rest.RestRequestResponseHandlerFactory; +import com.github.ambry.rest.RestRequestService; +import com.github.ambry.rest.RestServerState; +import com.github.ambry.rest.StorageServerNettyFactory; import com.github.ambry.store.StorageManager; import com.github.ambry.store.StoreKeyConverterFactory; import com.github.ambry.store.StoreKeyFactory; @@ -95,6 +106,9 @@ public class AmbryServer { private final NotificationSystem notificationSystem; private ServerMetrics metrics = null; private Time time; + private RequestHandlerPool requestHandlerPoolForHttp2; + private RestRequestHandler restRequestHandlerForHttp2; + private NioServer nettyHttp2Server; public AmbryServer(VerifiableProperties properties, ClusterAgentsFactory clusterAgentsFactory, ClusterSpectatorFactory clusterSpectatorFactory, Time time) { @@ -201,6 +215,36 @@ public void startup() throws InstantiationException { networkServer.getRequestResponseChannel(), requests); networkServer.start(); + // Start netty http2 server + if (nodeId.hasHttp2Port()) { + RestServerConfig restServerConfig = new RestServerConfig(properties); + SSLFactory sslFactory = new NettySslHttp2Factory(sslConfig); + RestServerState restServerState = new RestServerState(restServerConfig.restServerHealthCheckUri); + NettyServerRequestResponseChannel requestResponseChannel = new NettyServerRequestResponseChannel(1); + RestRequestService restRequestService = new StorageRestRequestService(requestResponseChannel); + + AmbryServerRequests ambryServerRequestsForHttp2 = + new AmbryServerRequests(storageManager, requestResponseChannel, clusterMap, nodeId, registry, serverMetrics, + findTokenHelper, notificationSystem, replicationManager, storeKeyFactory, serverConfig, + storeKeyConverterFactory, statsManager); + requestHandlerPoolForHttp2 = + new RequestHandlerPool(serverConfig.serverRequestHandlerNumOfThreads, requestResponseChannel, + ambryServerRequestsForHttp2); + + RestRequestResponseHandlerFactory restRequestHandlerFactory = + Utils.getObj(restServerConfig.restServerRequestResponseHandlerFactory, + restServerConfig.restServerRequestHandlerScalingUnitCount, registry, restRequestService); + restRequestHandlerForHttp2 = restRequestHandlerFactory.getRestRequestHandler(); + restRequestHandlerForHttp2.start(); + + NioServerFactory nioServerFactory = + new StorageServerNettyFactory(nodeId.getHttp2Port(), properties, registry, restRequestHandlerForHttp2, + restServerState, sslFactory); + nettyHttp2Server = nioServerFactory.getNioServer(); + nettyHttp2Server.start(); + } + + // Other code List ambryHealthReports = new ArrayList<>(); Set validStatsTypes = new HashSet<>(); for (StatsReportType type : StatsReportType.values()) { @@ -251,6 +295,15 @@ public void shutdown() { if (networkServer != null) { networkServer.shutdown(); } + if (nettyHttp2Server != null) { + nettyHttp2Server.shutdown(); + } + if (restRequestHandlerForHttp2 != null) { + restRequestHandlerForHttp2.shutdown(); + } + if (requestHandlerPoolForHttp2 != null) { + requestHandlerPoolForHttp2.shutdown(); + } if (requestHandlerPool != null) { requestHandlerPool.shutdown(); } diff --git a/ambry-server/src/test/java/com.github.ambry.server/Http2BlockingChannel.java b/ambry-server/src/test/java/com.github.ambry.server/Http2BlockingChannel.java index f5ff1c2c65..b485c8ad8f 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/Http2BlockingChannel.java +++ b/ambry-server/src/test/java/com.github.ambry.server/Http2BlockingChannel.java @@ -19,6 +19,9 @@ import com.github.ambry.network.ChannelOutput; import com.github.ambry.network.ConnectedChannel; import com.github.ambry.network.Send; +import com.github.ambry.rest.Http2ClientChannelInitializer; +import com.github.ambry.rest.Http2ClientStreamInitializer; +import com.github.ambry.rest.Http2ResponseHandler; import com.github.ambry.rest.NettySslHttp2Factory; import com.github.ambry.utils.ByteBufferChannel; import com.github.ambry.utils.NettyByteBufDataInputStream; @@ -26,9 +29,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -39,12 +40,15 @@ import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; +import io.netty.util.concurrent.Promise; import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.security.GeneralSecurityException; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,14 +59,14 @@ */ public class Http2BlockingChannel implements ConnectedChannel { private static final Logger logger = LoggerFactory.getLogger(Http2BlockingChannel.class); - private final Http2ResponseHandler http2ResponseHandler; private final String hostName; private final int port; private EventLoopGroup workerGroup; private Channel channel; + private Promise responsePromise; + private Http2StreamChannelBootstrap http2StreamChannelBootstrap; public Http2BlockingChannel(String hostName, int port) { - http2ResponseHandler = new Http2ResponseHandler(); this.hostName = hostName; this.port = port; } @@ -86,6 +90,8 @@ public void connect() throws IOException { // Start the client. channel = b.connect().syncUninterruptibly().channel(); logger.info("Connected to remote host"); + Http2ClientStreamInitializer initializer = new Http2ClientStreamInitializer(new Http2ResponseHandler()); + http2StreamChannelBootstrap = new Http2StreamChannelBootstrap(channel).handler(initializer); } @Override @@ -103,24 +109,27 @@ public void send(Send request) throws IOException { byteBufferChannel.getBuffer().position(0); ByteBuf byteBuf = Unpooled.wrappedBuffer(byteBufferChannel.getBuffer()); - Http2ClientStreamInitializer initializer = new Http2ClientStreamInitializer(http2ResponseHandler); - Http2StreamChannel childChannel = - new Http2StreamChannelBootstrap(channel).handler(initializer).open().syncUninterruptibly().getNow(); + Http2StreamChannel childChannel = http2StreamChannelBootstrap.open().syncUninterruptibly().getNow(); Http2Headers http2Headers = new DefaultHttp2Headers().method(HttpMethod.POST.asciiName()).scheme("https").path("/"); + responsePromise = childChannel.eventLoop().newPromise(); + childChannel.attr(Http2ResponseHandler.RESPONSE_PROMISE).set(responsePromise); DefaultHttp2HeadersFrame headersFrame = new DefaultHttp2HeadersFrame(http2Headers, false); DefaultHttp2DataFrame dataFrame = new DefaultHttp2DataFrame(byteBuf, true); - ChannelPromise childChannelPromise = childChannel.newPromise(); childChannel.write(headersFrame); - ChannelFuture channelFuture = childChannel.write(dataFrame); + childChannel.write(dataFrame); childChannel.flush(); - http2ResponseHandler.put(channelFuture, childChannelPromise); } @Override public ChannelOutput receive() throws IOException { - Http2ResponseHandler.StreamResult streamResult = http2ResponseHandler.awaitResponses(5, TimeUnit.SECONDS); - DataInputStream dataInputStream = new NettyByteBufDataInputStream(streamResult.getByteBuf()); + ByteBuf responseByteBuf; + try { + responseByteBuf = responsePromise.get(3, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new IOException("No response received in 3 seconds."); + } + DataInputStream dataInputStream = new NettyByteBufDataInputStream(responseByteBuf); return new ChannelOutput(dataInputStream, dataInputStream.readLong()); } diff --git a/ambry-server/src/test/java/com.github.ambry.server/Http2ResponseHandler.java b/ambry-server/src/test/java/com.github.ambry.server/Http2ResponseHandler.java deleted file mode 100644 index cf9694b6de..0000000000 --- a/ambry-server/src/test/java/com.github.ambry.server/Http2ResponseHandler.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.github.ambry.server; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http2.HttpConversionUtil; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Process {@link io.netty.handler.codec.http.FullHttpResponse} translated from HTTP/2 frames - */ -@ChannelHandler.Sharable -public class Http2ResponseHandler extends SimpleChannelInboundHandler { - - protected final Logger logger = LoggerFactory.getLogger(getClass()); - private StreamResult streamResult; - - public Http2ResponseHandler() { - - } - - /** - * Track the {@link ChannelFuture} and {@link ChannelPromise} - * - * @param writeFuture A future that represent the request write operation - * @param promise The promise object that will be used to wait/notify events - * @return The previous object associated with {@code streamId} - * @see Http2ResponseHandler#awaitResponses(long, TimeUnit) - */ - public void put(ChannelFuture writeFuture, ChannelPromise promise) { - streamResult = new StreamResult(writeFuture, promise); - } - - /** - * Wait (sequentially) for a time duration for each anticipated response - * - * @param timeout Value of time to wait for each response - * @param unit Units associated with {@code timeout} - * @see Http2ResponseHandler#put(ChannelFuture, ChannelPromise) - */ - public StreamResult awaitResponses(long timeout, TimeUnit unit) { - ChannelFuture writeFuture = streamResult.getChannelFuture(); - if (!writeFuture.awaitUninterruptibly(timeout, unit)) { - throw new IllegalStateException("Timed out waiting to write for stream id " + streamResult.getChannelFuture()); - } - if (!writeFuture.isSuccess()) { - throw new RuntimeException(writeFuture.cause()); - } - ChannelPromise promise = streamResult.getChannelPromise(); - if (!promise.awaitUninterruptibly(timeout, unit)) { - throw new IllegalStateException("Timed out waiting for response on stream id " + streamResult.getChannelFuture()); - } - if (!promise.isSuccess()) { - throw new RuntimeException(promise.cause()); - } - return streamResult; - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception { - Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()); - if (streamId == null) { - logger.error("Http2ResponseHandler unexpected message received: " + msg); - return; - } - logger.trace("Stream response received."); - streamResult.setByteBuf(msg.content().retainedDuplicate()); - streamResult.getChannelPromise().setSuccess(); - } - - class StreamResult { - private final ChannelFuture channelFuture; - private final ChannelPromise channelPromise; - private ByteBuf byteBuf; - - StreamResult(ChannelFuture channelFuture, ChannelPromise channelPromise) { - this.channelFuture = channelFuture; - this.channelPromise = channelPromise; - } - - public ChannelFuture getChannelFuture() { - return channelFuture; - } - - public ChannelPromise getChannelPromise() { - return channelPromise; - } - - public void setByteBuf(ByteBuf byteBuf) { - this.byteBuf = byteBuf; - } - - public ByteBuf getByteBuf() { - return byteBuf; - } - } -} diff --git a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java index 694a2298fe..dc6200f72d 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java +++ b/ambry-server/src/test/java/com.github.ambry.server/MockStorageManager.java @@ -30,6 +30,8 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.replication.FindToken; import com.github.ambry.replication.FindTokenHelper; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.store.FindInfo; import com.github.ambry.store.MessageInfo; import com.github.ambry.store.MessageReadSet; @@ -91,6 +93,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return 0; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + + } + @Override public int count() { return 0; diff --git a/ambry-store/src/main/java/com.github.ambry.store/StoreMessageReadSet.java b/ambry-store/src/main/java/com.github.ambry.store/StoreMessageReadSet.java index 32a87aba24..45c88c6113 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/StoreMessageReadSet.java +++ b/ambry-store/src/main/java/com.github.ambry.store/StoreMessageReadSet.java @@ -15,6 +15,8 @@ import com.github.ambry.account.Account; import com.github.ambry.account.Container; +import com.github.ambry.router.AsyncWritableChannel; +import com.github.ambry.router.Callback; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; import java.io.Closeable; @@ -208,6 +210,27 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset, return written; } + @Override + public void writeTo(AsyncWritableChannel channel, Callback callback) { + int lastIndex = readOptions.size() - 1; + int i = 0; + for (BlobReadOptions options : readOptions) { + ByteBuffer buf = options.getPrefetchedData(); + if (buf == null) { + callback.onCompletion(null, new IllegalStateException("Data should be prefetched.")); + } + buf.position(0); + if (i == lastIndex) { + // only the last one needs callback. + channel.write(buf, callback); + } else { + // TODO: Stop writing to the channel whenever there is an exception and stop the for loop. + channel.write(buf, null); + } + i++; + } + } + @Override public int count() { return readOptions.size(); diff --git a/build.gradle b/build.gradle index 0e3303edd5..4a107172b9 100644 --- a/build.gradle +++ b/build.gradle @@ -349,7 +349,8 @@ project(':ambry-router') { project(':ambry-messageformat'), project(':ambry-protocol'), project(':ambry-network'), - project(':ambry-cloud') + project(':ambry-cloud'), + project(':ambry-rest') compile "io.dropwizard.metrics:metrics-core:$metricsVersion" compile "org.bouncycastle:bcpkix-jdk15on:$bouncycastleVersion" compile "io.netty:netty-buffer:$nettyVersion" diff --git a/config/server.properties b/config/server.properties index 3184902dde..96fa83d241 100644 --- a/config/server.properties +++ b/config/server.properties @@ -17,3 +17,8 @@ clustermap.cluster.name=Ambry_Dev clustermap.datacenter.name=Datacenter clustermap.host.name=localhost clustermap.port=6667 + +# server http2 +rest.server.rest.request.service.factory=com.github.ambry.server.StorageRestRequestService +rest.server.nio.server.factory=com.github.ambry.rest.StorageServerNettyFactory +ssl.client.authentication=none