Skip to content

Commit

Permalink
Add Netty Based HTTP2 implementation for storage server. (#1352)
Browse files Browse the repository at this point in the history
Add AsyncWritableChannel interface for MessageReadSet and Send.
Add StorageServerNettyFactory and StorageServerNettyChannelInitializer.
Add HTTP2 availability in AmbryServer but it is disabled until HTTP2 port populated in Helix.
Add ServerHttp2Test.
  • Loading branch information
zzmao authored Feb 10, 2020
1 parent b846a56 commit 3f47a08
Show file tree
Hide file tree
Showing 43 changed files with 670 additions and 206 deletions.
2 changes: 1 addition & 1 deletion ambry-api/src/main/java/com.github.ambry/network/Send.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface Send {
/**
* Placeholder to support {@link AsyncWritableChannel}
*/
default void writeTo(AsyncWritableChannel channel, Callback<Long> callback) throws IOException {
default void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
return;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public static final class Headers {
* Response header indicating the reason a request is non compliant.
*/
public final static String NON_COMPLIANCE_WARNING = "x-ambry-non-compliance-warning";

}

public static final class TrackingHeaders {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.github.ambry.store;

import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;

Expand All @@ -36,7 +38,15 @@ public interface MessageReadSet {
long writeTo(int index, WritableByteChannel channel, long relativeOffset, long maxSize) throws IOException;

/**
* Returns the total number of messages in this set
* This method is intend to write prefetched data from {@link MessageReadSet} to {@link AsyncWritableChannel}. Data
* should be ready in memory(no blocking call) before write to {@link AsyncWritableChannel} asynchronously. Callback is
* called when the entire batch of writes succeeds or fails.
* @param channel the channel into which the data needs to be written to
* @param callback The callback when data is fully wrote to the channel.
*/
void writeTo(AsyncWritableChannel channel, Callback<Long> callback);

/**
* @return The total number of messages in this set
*/
int count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.github.ambry.cloud;

import com.github.ambry.commons.BlobId;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StoreException;
import com.github.ambry.store.StoreKey;
Expand All @@ -25,6 +27,7 @@
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;


/**
Expand Down Expand Up @@ -66,6 +69,12 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset,
return written;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
// TODO: read from cloud based store and write to AsyncWritableChannel is needed in the future.
throw new UnsupportedOperationException();
}

@Override
public int count() {
return blobReadInfoList.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MockClusterMap implements ClusterMap {
public static final String SPECIAL_PARTITION_CLASS = "specialPartitionClass";
public static final int PLAIN_TEXT_PORT_START_NUMBER = 62000;
public static final int SSL_PORT_START_NUMBER = 63000;
public static final int HTTP2_PORT_START_NUMBER = 64000;

protected final boolean enableSSLPorts;
protected final Map<Long, PartitionId> partitions;
Expand Down Expand Up @@ -117,6 +118,7 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe
String dcName = null;
int currentPlainTextPort = PLAIN_TEXT_PORT_START_NUMBER;
int currentSSLPort = SSL_PORT_START_NUMBER;
int currentHttp2Port = HTTP2_PORT_START_NUMBER;
for (int i = 0; i < numNodes; i++) {
if (i % 3 == 0) {
dcIndex++;
Expand All @@ -126,9 +128,11 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe
MockDataNodeId dataNodeId;
if (enableSSLPorts) {
dataNodeId =
createDataNode(getListOfPorts(currentPlainTextPort++, currentSSLPort++), dcName, numMountPointsPerNode);
createDataNode(getListOfPorts(currentPlainTextPort++, currentSSLPort++, currentHttp2Port++), dcName,
numMountPointsPerNode);
} else {
dataNodeId = createDataNode(getListOfPorts(currentPlainTextPort++), dcName, numMountPointsPerNode);
dataNodeId = createDataNode(getListOfPorts(currentPlainTextPort++, null, currentHttp2Port++), dcName,
numMountPointsPerNode);
}
dataNodes.add(dataNodeId);
dcToDataNodes.computeIfAbsent(dcName, name -> new ArrayList<>()).add(dataNodeId);
Expand Down Expand Up @@ -244,16 +248,15 @@ public static MockClusterMap createOneNodeRecoveryClusterMap(MockDataNodeId reco
return new MockClusterMap(recoveryNode, vcrNode, dcName);
}

protected ArrayList<Port> getListOfPorts(int port) {
ArrayList<Port> ports = new ArrayList<>();
ports.add(new Port(port, PortType.PLAINTEXT));
return ports;
}

public static ArrayList<Port> getListOfPorts(int port, int sslPort) {
public static ArrayList<Port> getListOfPorts(int port, Integer sslPort, Integer http2Port) {
ArrayList<Port> ports = new ArrayList<Port>();
ports.add(new Port(port, PortType.PLAINTEXT));
ports.add(new Port(sslPort, PortType.SSL));
if (sslPort != null) {
ports.add(new Port(sslPort, PortType.SSL));
}
if (http2Port != null) {
ports.add(new Port(http2Port, PortType.HTTP2));
}
return ports;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ public static void addSSLProperties(Properties props, String sslEnabledDatacente
addSSLProperties(props, sslEnabledDatacenters, mode, trustStoreFile, certAlias, SSL_CONTEXT_PROVIDER);
}

/**
* Setup HTTP2 server related properties.
* @param properties the {@link Properties} instance.
*/
public static void addHttp2Properties(Properties properties) {
properties.setProperty("rest.server.rest.request.service.factory",
"com.github.ambry.server.StorageRestRequestService");
properties.setProperty("rest.server.nio.server.factory", "com.github.ambry.rest.StorageServerNettyFactory");
properties.setProperty("ssl.client.authentication", "none");
}

/**
* Generate a cert and add SSL related properties to {@code props}
* @param props the {@link Properties} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.github.ambry.messageformat;

import com.github.ambry.network.Send;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.StoreKey;
import com.github.ambry.store.StoreKeyFactory;
Expand Down Expand Up @@ -266,6 +268,11 @@ public long writeTo(WritableByteChannel channel) throws IOException {
return written;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
readSet.writeTo(channel, callback);
}

@Override
public boolean isSendComplete() {
return totalSizeToWrite == sizeWritten;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.github.ambry.messageformat;

import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.HardDeleteInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
Expand Down Expand Up @@ -286,6 +288,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset,
return channel.write(ByteBuffer.wrap(toReturn));
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {

}

@Override
public int count() {
return messageList.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.github.ambry.messageformat;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.store.MessageReadSet;
import com.github.ambry.store.MockId;
import com.github.ambry.store.MockIdFactory;
Expand Down Expand Up @@ -86,6 +88,11 @@ public long writeTo(int index, WritableByteChannel channel, long relativeOffset,
return written;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {

}

@Override
public int count() {
return buffers.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.github.ambry.rest.RestResponseChannel;
import com.github.ambry.rest.RestUtils;
import com.github.ambry.router.Callback;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;

Expand All @@ -32,6 +34,14 @@ public NettyServerRequestResponseChannel(int queueSize) {
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
@Override
public void sendRequest(NetworkRequest request) throws InterruptedException {
DataInputStream stream = new DataInputStream(request.getInputStream());
try {
// The first 8 bytes is size of the request. TCP implementation uses this size to allocate buffer. See {@link BoundedReceive}
// Here we just need to consume it.
stream.readLong();
} catch (IOException e) {
throw new IllegalStateException("stream read error." + e);
}
requestQueue.put(request);
}

Expand All @@ -46,11 +56,8 @@ public void sendResponse(Send payloadToSend, NetworkRequest originalRequest, Ser

RestResponseChannel restResponseChannel = ((NettyServerRequest) originalRequest).getRestResponseChannel();
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, payloadToSend.sizeInBytes());
try {
payloadToSend.writeTo(restResponseChannel, null); // an extra copy
} catch (IOException e) {
throw new InterruptedException(e.toString());
}
payloadToSend.writeTo(restResponseChannel, (result, exception) -> {
});// an extra copy
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.github.ambry.protocol;

import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.utils.Utils;
import java.io.DataInputStream;
Expand Down Expand Up @@ -59,15 +61,30 @@ public static AdminResponse readFrom(DataInputStream stream) throws IOException
return new AdminResponse(correlationId, clientId, error);
}

@Override
public long writeTo(WritableByteChannel channel) throws IOException {
/**
* A private method shared by {@link AdminResponse#writeTo(WritableByteChannel)} and
* {@link AdminResponse#writeTo(AsyncWritableChannel, Callback)}.
* This method allocate bufferToSend and write headers to it if bufferToSend is null.
*/
private void prepareBufferToSend() {
if (bufferToSend == null) {
serializeIntoBuffer();
bufferToSend.flip();
}
}

@Override
public long writeTo(WritableByteChannel channel) throws IOException {
prepareBufferToSend();
return bufferToSend.hasRemaining() ? channel.write(bufferToSend) : 0;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
prepareBufferToSend();
channel.write(bufferToSend, callback);
}

@Override
public boolean isSendComplete() {
return bufferToSend != null && bufferToSend.remaining() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.github.ambry.protocol;

import com.github.ambry.network.Send;
import com.github.ambry.router.AsyncWritableChannel;
import com.github.ambry.router.Callback;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.List;
Expand All @@ -24,33 +26,51 @@
*/
public class CompositeSend implements Send {

private final List<Send> compositSendList;
private final List<Send> compositeSendList;
private long totalSizeToWrite;
private int currentIndexInProgress;

public CompositeSend(List<Send> compositSendList) {
this.compositSendList = compositSendList;
public CompositeSend(List<Send> compositeSendList) {
this.compositeSendList = compositeSendList;
this.currentIndexInProgress = 0;
for (Send messageFormatSend : compositSendList) {
for (Send messageFormatSend : compositeSendList) {
totalSizeToWrite += messageFormatSend.sizeInBytes();
}
}

@Override
public long writeTo(WritableByteChannel channel) throws IOException {
long written = 0;
if (currentIndexInProgress < compositSendList.size()) {
written = compositSendList.get(currentIndexInProgress).writeTo(channel);
if (compositSendList.get(currentIndexInProgress).isSendComplete()) {
if (currentIndexInProgress < compositeSendList.size()) {
written = compositeSendList.get(currentIndexInProgress).writeTo(channel);
if (compositeSendList.get(currentIndexInProgress).isSendComplete()) {
currentIndexInProgress++;
}
}
return written;
}

@Override
public void writeTo(AsyncWritableChannel channel, Callback<Long> callback) {
int lastIndex = compositeSendList.size() - 1;
int i = 0;
// This callback technically won't be set to the correct value since it will only reflect the size of the last send,
// not all sends in the batch. This may not currently be a problem but is something to look out for.
for (Send send : compositeSendList) {
if (i == lastIndex) {
// only the last one pass in callback
send.writeTo(channel, callback);
} else {
//TODO: stop writing to the channel whenever there is an exception here and stop the for loop.
send.writeTo(channel, null);
}
i++;
}
}

@Override
public boolean isSendComplete() {
return currentIndexInProgress == compositSendList.size();
return currentIndexInProgress == compositeSendList.size();
}

@Override
Expand Down
Loading

0 comments on commit 3f47a08

Please sign in to comment.