From 75388d989bd8c9c361e2c7b38f3667bf8b7aa67d Mon Sep 17 00:00:00 2001 From: Justin Lin Date: Fri, 14 Feb 2020 14:32:57 -0800 Subject: [PATCH] Default to use netty bytebuf in network layer and remove getAndRelease method (#1375) This pr includes two changes: 1. Default to use netty bytebuf in network layer, which will remove lots of code dealing with java bytebuffer 2. Remove getAndRelease and replace it with content method and implements ByteBufHolder interface. --- .../config/NetworkConfig.java | 12 -- .../com.github.ambry/config/RouterConfig.java | 9 -- .../network/BoundedByteBufferReceive.java | 95 --------------- .../network/BoundedNettyByteBufReceive.java | 48 ++++---- .../network/BoundedReceive.java | 34 ------ .../network/NetworkReceive.java | 6 +- .../network/NetworkRequest.java | 9 +- .../network/ResponseInfo.java | 66 ++++------ .../BlobData.java | 68 ++--------- .../ValidatingTransformer.java | 2 +- .../MessageFormatInputStreamTest.java | 2 +- .../MessageFormatRecordTest.java | 27 +++- .../MessageFormatSendTest.java | 17 ++- .../MessageSievingInputStreamTest.java | 4 +- .../LocalNetworkClient.java | 6 +- .../LocalRequestResponseChannel.java | 18 +-- .../SocketNetworkClient.java | 4 +- .../SocketRequestResponseChannel.java | 35 +++--- .../SocketServer.java | 6 +- .../Transmission.java | 7 +- .../BoundedByteBufferReceiveTest.java | 48 -------- .../BoundedNettyByteBufReceiveTest.java | 2 +- .../SSLSelectorTest.java | 27 +++- .../SelectorTest.java | 32 ++++- .../SocketNetworkClientTest.java | 115 +++--------------- .../SocketRequestResponseChannelTest.java | 28 ++++- .../BlobIdTransformer.java | 6 +- .../com.github.ambry.rest/RestServer.java | 8 +- .../DeleteManager.java | 2 +- .../GetBlobOperation.java | 11 +- .../com.github.ambry.router/GetManager.java | 2 +- .../com.github.ambry.router/PutManager.java | 2 +- .../com.github.ambry.router/RouterUtils.java | 5 +- .../TtlUpdateManager.java | 2 +- .../GetBlobInfoOperationTest.java | 18 ++- .../GetBlobOperationTest.java | 26 ++-- .../com.github.ambry.router/MockSelector.java | 29 +---- .../com.github.ambry.router/MockServer.java | 12 +- .../NonBlockingRouterTest.java | 21 +++- .../PutOperationTest.java | 43 +++++-- .../ServerHardDeleteTest.java | 2 +- .../ServerTestUtil.java | 2 +- .../com.github.ambry.server/Verifier.java | 4 +- .../AmbryServerRequestsTest.java | 12 +- .../store/HardDeleteVerifier.java | 13 +- .../tools/admin/BlobValidator.java | 2 +- .../tools/admin/ServerAdminTool.java | 15 +-- .../tools/perf/ServerReadPerformance.java | 3 +- .../AbstractByteBufHolder.java | 96 +++++++++++++++ .../java/com.github.ambry.utils/Utils.java | 59 +-------- .../com.github.ambry.utils/UtilsTest.java | 44 ------- 51 files changed, 461 insertions(+), 705 deletions(-) delete mode 100644 ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java delete mode 100644 ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java delete mode 100644 ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java create mode 100644 ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java diff --git a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java index bd5192fee8..3ec83bb721 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/NetworkConfig.java @@ -32,8 +32,6 @@ public class NetworkConfig { public static final String SELECTOR_EXECUTOR_POOL_SIZE = "selector.executor.pool.size"; public static final String SELECTOR_MAX_KEY_TO_PROCESS = "selector.max.key.to.process"; public static final String SELECTOR_USE_DIRECT_BUFFERS = "selector.use.direct.buffers"; - public static final String NETWORK_USE_NETTY_BYTE_BUF = "network.use.netty.byte.buf"; - public static final String NETWORK_PUT_REQUEST_SHARE_MEMORY = "network.put.request.share.memory"; /** * The number of io threads that the server uses for carrying out network requests @@ -121,14 +119,6 @@ public class NetworkConfig { @Default("false") public final boolean selectorUseDirectBuffers; - @Config(NETWORK_USE_NETTY_BYTE_BUF) - @Default("false") - public final boolean networkUseNettyByteBuf; - - @Config(NETWORK_PUT_REQUEST_SHARE_MEMORY) - @Default("false") - public final boolean networkPutRequestShareMemory; - public NetworkConfig(VerifiableProperties verifiableProperties) { numIoThreads = verifiableProperties.getIntInRange(NUM_IO_THREADS, 8, 1, Integer.MAX_VALUE); queuedMaxRequests = verifiableProperties.getIntInRange(QUEUED_MAX_REQUESTS, 500, 1, Integer.MAX_VALUE); @@ -147,7 +137,5 @@ public NetworkConfig(VerifiableProperties verifiableProperties) { selectorMaxKeyToProcess = verifiableProperties.getIntInRange(SELECTOR_MAX_KEY_TO_PROCESS, -1, -1, Integer.MAX_VALUE); selectorUseDirectBuffers = verifiableProperties.getBoolean(SELECTOR_USE_DIRECT_BUFFERS, false); - networkUseNettyByteBuf = verifiableProperties.getBoolean(NETWORK_USE_NETTY_BYTE_BUF, false); - networkPutRequestShareMemory = verifiableProperties.getBoolean(NETWORK_PUT_REQUEST_SHARE_MEMORY, false); } } diff --git a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java index 6eb79742e7..5482c6379a 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/RouterConfig.java @@ -94,7 +94,6 @@ public class RouterConfig { "router.operation.tracker.histogram.cache.timeout.ms"; public static final String ROUTER_MAX_IN_MEM_PUT_CHUNKS = "router.max.in.mem.put.chunks"; public static final String ROUTER_MAX_IN_MEM_GET_CHUNKS = "router.max.in.mem.get.chunks"; - public static final String ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY = "router.get.blob.operation.share.memory"; public static final String ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED = "router.get.eligible.replicas.by.state.enabled"; public static final String ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET = "router.put.use.dynamic.success.target"; @@ -440,13 +439,6 @@ public class RouterConfig { @Default("4") public final int routerMaxInMemGetChunks; - /** - * If {@code true}, the blob data shares memory with networking buffer in GetBlobOperation - */ - @Config(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY) - @Default("false") - public final boolean routerGetBlobOperationShareMemory; - /** * if {@code true}, operation tracker will get replicas in required states based on the type of operation. This helps * dynamically manage replicas in cluster (i.e. add/remove/move replicas) without restarting frontends. @@ -558,7 +550,6 @@ public RouterConfig(VerifiableProperties verifiableProperties) { Integer.MAX_VALUE / routerMaxPutChunkSizeBytes); routerMaxInMemGetChunks = verifiableProperties.getIntInRange(ROUTER_MAX_IN_MEM_GET_CHUNKS, 4, 1, Integer.MAX_VALUE / routerMaxPutChunkSizeBytes); - routerGetBlobOperationShareMemory = verifiableProperties.getBoolean(ROUTER_GET_BLOB_OPERATION_SHARE_MEMORY, false); routerGetEligibleReplicasByStateEnabled = verifiableProperties.getBoolean(ROUTER_GET_ELIGIBLE_REPLICAS_BY_STATE_ENABLED, false); routerPutUseDynamicSuccessTarget = verifiableProperties.getBoolean(ROUTER_PUT_USE_DYNAMIC_SUCCESS_TARGET, false); diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java deleted file mode 100644 index 66964ec7ea..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedByteBufferReceive.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A byte buffer version of Receive to buffer the incoming request or response. - */ -public class BoundedByteBufferReceive implements BoundedReceive { - - private ByteBuffer buffer = null; - private ByteBuffer sizeBuffer; - private long sizeToRead; - private long sizeRead; - private final static Logger logger = LoggerFactory.getLogger(BoundedByteBufferReceive.class); - - public BoundedByteBufferReceive() { - sizeToRead = 0; - sizeRead = 0; - sizeBuffer = ByteBuffer.allocate(Long.BYTES); - } - - @Override - public boolean isReadComplete() { - return buffer != null && sizeRead >= sizeToRead; - } - - @Override - public long readFrom(ReadableByteChannel channel) throws IOException { - long bytesRead = 0; - if (buffer == null) { - bytesRead = channel.read(sizeBuffer); - if (bytesRead < 0) { - throw new EOFException(); - } - if (sizeBuffer.position() == sizeBuffer.capacity()) { - sizeBuffer.flip(); - sizeToRead = sizeBuffer.getLong(); - sizeRead += Long.BYTES; - buffer = ByteBuffer.allocate((int) sizeToRead - Long.BYTES); - sizeBuffer = null; - } - } - if (buffer != null && sizeRead < sizeToRead) { - long bytesReadFromChannel = channel.read(buffer); - if (bytesReadFromChannel < 0) { - throw new EOFException(); - } - sizeRead += bytesReadFromChannel; - bytesRead += bytesReadFromChannel; - if (sizeRead == sizeToRead) { - buffer.flip(); - } - } - logger.trace("size read from channel {}", sizeRead); - return bytesRead; - } - - @Override - public ByteBuffer getAndRelease() { - try { - return buffer; - } finally { - buffer = null; - } - } - - /** - * The total size in bytes that needs to receive from the channel - * It will be initialized only after header is read. - * @return the size of the data in bytes to receive after reading header, otherwise return 0 - */ - @Override - public long sizeRead() { - return sizeRead; - } -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java index 2d4c6d7a2c..74d91534ef 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java +++ b/ambry-api/src/main/java/com.github.ambry/network/BoundedNettyByteBufReceive.java @@ -13,13 +13,13 @@ */ package com.github.ambry.network; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +27,7 @@ /** * A netty {@link ByteBuf} version of Receive to buffer the incoming request or response. */ -public class BoundedNettyByteBufReceive implements BoundedReceive { +public class BoundedNettyByteBufReceive extends AbstractByteBufHolder { private ByteBuf buffer = null; private ByteBuf sizeBuffer = null; @@ -35,9 +35,16 @@ public class BoundedNettyByteBufReceive implements BoundedReceive { private long sizeRead = 0; private final static Logger logger = LoggerFactory.getLogger(BoundedNettyByteBufReceive.class); - @Override + public BoundedNettyByteBufReceive() { + } + + BoundedNettyByteBufReceive(ByteBuf buffer, long sizeToRead) { + this.buffer = Objects.requireNonNull(buffer); + this.sizeToRead = sizeToRead; + } + public boolean isReadComplete() { - return buffer != null && sizeRead >= sizeToRead; + return buffer != null && sizeRead >= sizeToRead; } /** @@ -56,7 +63,6 @@ private int readBytesFromReadableByteChannel(ReadableByteChannel channel, ByteBu return n; } - @Override public long readFrom(ReadableByteChannel channel) throws IOException { long bytesRead = 0; if (buffer == null) { @@ -99,32 +105,22 @@ public long readFrom(ReadableByteChannel channel) throws IOException { return bytesRead; } - /** - * Returns the payload as {@link ByteBuf}, at the same time release the current reference to this payload. - * It's not safe to call this function multiple times. - * @return - */ - @Override - public ByteBuf getAndRelease() { - if (buffer == null) { - return null; - } else { - try { - return buffer.retainedDuplicate(); - } finally { - buffer.release(); - buffer = null; - } - } - } - /** * The total size in bytes that needs to receive from the channel * It will be initialized only after header is read. * @return the size of the data in bytes to receive after reading header, otherwise return 0 */ - @Override public long sizeRead() { return sizeRead; } + + @Override + public ByteBuf content() { + return buffer; + } + + @Override + public BoundedNettyByteBufReceive replace(ByteBuf content) { + return new BoundedNettyByteBufReceive(content, sizeToRead); + } } diff --git a/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java b/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java deleted file mode 100644 index 365657a954..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/BoundedReceive.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright 2019 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -/** - * This is an interface for {@link Receive} to use any kinds of data structure to buffer the - * incoming request or response from the network. - * @param The type of the buffer. It's either a {@link java.nio.ByteBuffer} or a {@link io.netty.buffer.ByteBuf}. - */ -public interface BoundedReceive extends Receive { - - /** - * Return the buffer and transfer the ownership of this buffer to the caller. It will release the underlying buffer - * and it's not safe to call this function twice. - * @return The byte buffer that contains the bytes from the network. - */ - T getAndRelease(); - - /** - * The size of read bytes from the network layer. - */ - long sizeRead(); -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java index 233c5b8b43..13c1d58fcd 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java +++ b/ambry-api/src/main/java/com.github.ambry/network/NetworkReceive.java @@ -27,14 +27,14 @@ public class NetworkReceive { /** * The bytes received from the destination */ - private final BoundedReceive receivedBytes; + private final BoundedNettyByteBufReceive receivedBytes; /** * The start time of when the receive started */ private final long receiveStartTimeInMs; - public NetworkReceive(String connectionId, BoundedReceive receivedBytes, Time time) { + public NetworkReceive(String connectionId, BoundedNettyByteBufReceive receivedBytes, Time time) { this.connectionId = connectionId; this.receivedBytes = receivedBytes; this.receiveStartTimeInMs = time.milliseconds(); @@ -44,7 +44,7 @@ public String getConnectionId() { return connectionId; } - public BoundedReceive getReceivedBytes() { + public BoundedNettyByteBufReceive getReceivedBytes() { return receivedBytes; } diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java index 7156ce68e5..53b9e1f872 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java +++ b/ambry-api/src/main/java/com.github.ambry/network/NetworkRequest.java @@ -13,6 +13,7 @@ */ package com.github.ambry.network; +import io.netty.buffer.ByteBuf; import java.io.InputStream; @@ -33,7 +34,11 @@ public interface NetworkRequest { long getStartTimeInMs(); /** - * Release any resource this request is holding. + * Release any resource this request is holding. By default it returns false so this method can be compatible + * with {@link ByteBuf#release()} + * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated */ - default void release() {}; + default boolean release() { + return false; + } } diff --git a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java index b498c86690..4bdb18e9c1 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java +++ b/ambry-api/src/main/java/com.github.ambry/network/ResponseInfo.java @@ -14,8 +14,8 @@ package com.github.ambry.network; import com.github.ambry.clustermap.DataNodeId; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; /** @@ -24,26 +24,33 @@ * was an error sending the request or a non-null ByteBuffer containing the successful response received for this * request. Also, this class contains {@link DataNodeId} to which the request is issued. */ -public class ResponseInfo { +public class ResponseInfo extends AbstractByteBufHolder { private final RequestInfo requestInfo; private final NetworkClientErrorCode error; private final DataNodeId dataNode; - private Object response; + private ByteBuf content; /** * Constructs a ResponseInfo with the given parameters. * @param requestInfo the {@link RequestInfo} associated with this response. * @param error the error encountered in sending this request, if there is any. - * @param response the response received for this request. + * @param content the response received for this request. */ - public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, Object response) { - this(requestInfo, error, response, requestInfo == null ? null : requestInfo.getReplicaId().getDataNodeId()); + public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, ByteBuf content) { + this(requestInfo, error, content, requestInfo == null ? null : requestInfo.getReplicaId().getDataNodeId()); } - public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, Object response, DataNodeId dataNode) { + /** + * Constructs a ResponseInfo with the given parameters. + * @param requestInfo the {@link RequestInfo} associated with this response. + * @param error the error encountered in sending this request, if there is any. + * @param content the response received for this request. + * @param dataNode the {@link DataNodeId} of this request. + */ + public ResponseInfo(RequestInfo requestInfo, NetworkClientErrorCode error, ByteBuf content, DataNodeId dataNode) { this.requestInfo = requestInfo; this.error = error; - this.response = response; + this.content = content; this.dataNode = dataNode; } @@ -61,35 +68,6 @@ public NetworkClientErrorCode getError() { return error; } - /** - * @return the response received for this request. - */ - public Object getResponse() { - return response; - } - - /** - * Decrease the reference count of underlying response. - */ - public void release() { - if (response != null) { - ReferenceCountUtil.release(response); - response = null; - } - } - - /** - * Tries to call {@link ByteBuf#touch(Object)} if the specified message implements - * {@link ByteBuf}. If the specified message doesn't implement {@link ByteBuf}, - * this method does nothing. - * @param hint hint object. - */ - public void touch(Object hint) { - if (response != null) { - ReferenceCountUtil.touch(response, hint); - } - } - /** * @return the {@link DataNodeId} with which the response is associated. */ @@ -99,7 +77,17 @@ public DataNodeId getDataNode() { @Override public String toString() { - return "ResponseInfo{" + "requestInfo=" + requestInfo + ", error=" + error + ", response=" + response - + ", dataNode=" + dataNode + '}'; + return "ResponseInfo{requestInfo=" + requestInfo + ", error=" + error + ", response=" + content + ", dataNode=" + + dataNode + '}'; + } + + @Override + public ByteBuf content() { + return content; + } + + @Override + public ResponseInfo replace(ByteBuf content) { + return new ResponseInfo(requestInfo, error, content, dataNode); } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java index fdb0c11e0e..bae2404adc 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/BlobData.java @@ -13,45 +13,28 @@ */ package com.github.ambry.messageformat; -import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.AbstractByteBufHolder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import java.nio.ByteBuffer; /** * Contains the blob stream along with some required info */ -public class BlobData { +public class BlobData extends AbstractByteBufHolder { private final BlobType blobType; private final long size; - private ByteBuf byteBuf; - private ByteBufferInputStream stream = null; + private ByteBuf content; /** * The blob data contains the stream and other required info * @param blobType {@link BlobType} of the blob * @param size The size of the blob content. - * @param byteBuf The content of this blob in a {@link ByteBuf}. + * @param content The content of this blob in a {@link ByteBuf}. */ - public BlobData(BlobType blobType, long size, ByteBuf byteBuf) { + public BlobData(BlobType blobType, long size, ByteBuf content) { this.blobType = blobType; this.size = size; - this.byteBuf = byteBuf; - } - - /** - * The blob data contains the stream and other required info - * @param blobType {@link BlobType} of the blob - * @param size The size of the blob content. - * @param stream The {@link ByteBufferInputStream} containing the blob content. - */ - @Deprecated - public BlobData(BlobType blobType, long size, ByteBufferInputStream stream) { - this.blobType = blobType; - this.size = size; - this.byteBuf = Unpooled.wrappedBuffer(stream.getByteBuffer()); - this.stream = stream; + this.content = content; } /** @@ -68,40 +51,13 @@ public long getSize() { return size; } - /** - * @return the {@link ByteBufferInputStream} containing the blob content. - */ - @Deprecated - public ByteBufferInputStream getStream() { - if (stream != null) { - return stream; - } - // The blob content is passed as a ByteBuf since the stream is nulle - if (byteBuf == null) { - return null; - } - ByteBuffer temp = ByteBuffer.allocate(byteBuf.readableBytes()); - byteBuf.readBytes(temp); - byteBuf.release(); - byteBuf = null; - temp.flip(); - stream = new ByteBufferInputStream(temp); - return stream; + @Override + public ByteBuf content() { + return content; } - /** - * Return the netty {@link ByteBuf} and then transfer the ownership to the caller. It's not safe - * to call this method more than once. - */ - public ByteBuf getAndRelease() { - if (byteBuf == null) { - return null; - } - try { - return byteBuf.retainedDuplicate(); - } finally { - byteBuf.release(); - byteBuf = null; - } + @Override + public BlobData replace(ByteBuf content) { + return new BlobData(blobType, size, content); } } diff --git a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java index e47d0df82c..8cec009099 100644 --- a/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java +++ b/ambry-messageformat/src/main/java/com.github.ambry.messageformat/ValidatingTransformer.java @@ -78,7 +78,7 @@ public TransformationOutput transform(Message message) { // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream transformedStream = new PutMessageFormatInputStream(keyInStream, encryptionKey, props, metadata, - new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); + new ByteBufInputStream(blobData.content(), true), blobData.getSize(), blobData.getBlobType()); MessageInfo transformedMsgInfo = new MessageInfo(keyInStream, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java index 0c4d8e8fa4..543527c5db 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatInputStreamTest.java @@ -255,7 +255,7 @@ private void messageFormatPutRecordsTest(short blobVersion, BlobType blobType, s } else { Assert.assertEquals(null, blobAll.getBlobEncryptionKey()); } - ByteBuf byteBuf = blobAll.getBlobData().getAndRelease(); + ByteBuf byteBuf = blobAll.getBlobData().content(); try { Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); } finally { diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java index 0efb2e8d1e..a491d1e2ac 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatRecordTest.java @@ -21,6 +21,7 @@ import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Crc32; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; @@ -33,7 +34,9 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import static com.github.ambry.account.Account.*; @@ -46,6 +49,18 @@ public class MessageFormatRecordTest { + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + //TODO Separate this mega test into smaller tests @Test public void deserializeTest() throws MessageFormatException, IOException { @@ -173,8 +188,9 @@ public void deserializeTest() throws MessageFormatException, IOException { BlobData blobData = MessageFormatRecord.deserializeBlob(new ByteBufferInputStream(sData)); Assert.assertEquals(blobData.getSize(), 2000); byte[] verify = new byte[2000]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals(verify, data.array()); + blobData.release(); // corrupt blob record V1 sData.flip(); @@ -640,8 +656,9 @@ private void testBlobRecordV2(int blobSize, BlobType blobType) throws IOExceptio BlobData blobData = getBlobRecordV2(blobSize, blobType, blobContent, entireBlob); Assert.assertEquals("Blob size mismatch", blobSize, blobData.getSize()); byte[] verify = new byte[blobSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("BlobContent mismatch", blobContent.array(), verify); + blobData.release(); // corrupt blob record V2 entireBlob.flip(); @@ -694,8 +711,9 @@ public void testBlobRecordWithMetadataContentV2() throws IOException, MessageFor Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); + blobData.release(); // deserialize and check for metadata contents metadataContent.rewind(); @@ -728,8 +746,9 @@ public void testBlobRecordWithMetadataContentV3() throws IOException, MessageFor BlobData blobData = getBlobRecordV2(metadataContentSize, BlobType.MetadataBlob, metadataContent, blob); Assert.assertEquals(metadataContentSize, blobData.getSize()); byte[] verify = new byte[metadataContentSize]; - blobData.getAndRelease().readBytes(verify); + blobData.content().readBytes(verify); Assert.assertArrayEquals("Metadata content mismatch", metadataContent.array(), verify); + blobData.release(); metadataContent.rewind(); CompositeBlobInfo compositeBlobInfo = deserializeMetadataContentV3(metadataContent, new MockIdFactory()); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java index af3fa188a1..b554d02fe3 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageFormatSendTest.java @@ -23,6 +23,7 @@ import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.ByteBufferOutputStream; import com.github.ambry.utils.Crc32; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -36,6 +37,7 @@ import java.util.Random; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +49,18 @@ public class MessageFormatSendTest { private final String putFormat; private static short messageFormatHeaderVersionSaved; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } + @BeforeClass public static void saveMessageFormatHeaderVersionToUse() { messageFormatHeaderVersionSaved = MessageFormatRecord.headerVersionToUse; @@ -475,8 +489,9 @@ private void doSendWriteCompositeMessagesTest(byte[][] blob, byte[][] userMetada Assert.assertEquals(BlobType.DataBlob, deserializedBlob.getBlobData().getBlobType()); Assert.assertEquals(blob[i].length, deserializedBlob.getBlobData().getSize()); byte[] readBlob = new byte[blob[i].length]; - deserializedBlob.getBlobData().getAndRelease().readBytes(readBlob); + deserializedBlob.getBlobData().content().readBytes(readBlob); Assert.assertArrayEquals(blob[i], readBlob); + deserializedBlob.getBlobData().release(); if (headerVersions[i] == MessageFormatRecord.Message_Header_Version_V1) { Assert.assertEquals(null, send.getMessageMetadataList().get(i)); diff --git a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java index da672e8dfc..8fe8ed11e3 100644 --- a/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java +++ b/ambry-messageformat/src/test/java/com.github.ambry.messageformat/MessageSievingInputStreamTest.java @@ -977,7 +977,7 @@ private void verifySievedTransformedMessage(MessageSievingInputStream sievedStre Assert.assertEquals(containerId, propsFromStream.getContainerId()); Assert.assertEquals(ByteBuffer.wrap(usermetadata), userMetadataFromStream); Assert.assertEquals(blobType, blobDataFromStream.getBlobType()); - ByteBuf byteBuf = blobDataFromStream.getAndRelease(); + ByteBuf byteBuf = blobDataFromStream.content(); try { Assert.assertEquals(Unpooled.wrappedBuffer(data), byteBuf); } finally { @@ -1086,7 +1086,7 @@ public TransformationOutput transform(Message message) { MessageInfo transformedMsgInfo; PutMessageFormatInputStream transformedStream = new PutMessageFormatInputStream(newKey, encryptionKey, props, metadata, - new ByteBufInputStream(blobData.getAndRelease(), true), blobData.getSize(), blobData.getBlobType()); + new ByteBufInputStream(blobData.content(), true), blobData.getSize(), blobData.getBlobType()); transformedMsgInfo = new MessageInfo(newKey, transformedStream.getSize(), msgInfo.isDeleted(), msgInfo.isTtlUpdated(), msgInfo.getExpirationTimeInMs(), msgInfo.getCrc(), msgInfo.getAccountId(), msgInfo.getContainerId(), diff --git a/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java index e646c974a3..149967ed58 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/LocalNetworkClient.java @@ -17,6 +17,8 @@ import com.github.ambry.network.LocalRequestResponseChannel.LocalChannelRequest; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -80,8 +82,8 @@ public List sendAndPoll(List requestInfos, Set responseQueue = getResponseQueue(localRequest.processorId); responseQueue.put(responseInfo); logger.debug("Added response for {}, size now {}", localRequest.processorId, responseQueue.size()); @@ -118,17 +119,16 @@ public void shutdown() { } /** - * Utility to extract a byte buffer from a {@link Send} object, skipping the size header. + * Utility to extract a {@link ByteBuf} from a {@link Send} object, skipping the size header. * @param payload the payload whose bytes we want. */ - static ByteBuffer byteBufferFromPayload(Send payload) throws IOException { + static ByteBuf byteBufFromPayload(Send payload) throws IOException { int bufferSize = (int) payload.sizeInBytes() - sizeByteArray.length; - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + ByteBuf buffer = Unpooled.buffer(bufferSize); // Skip the size header - long bytesWritten = payload.writeTo(new ByteBufferChannel(ByteBuffer.wrap(sizeByteArray))); - WritableByteChannel byteChannel = Channels.newChannel(new ByteBufferOutputStream(buffer)); + payload.writeTo(new ByteBufferChannel(ByteBuffer.wrap(sizeByteArray))); + WritableByteChannel byteChannel = Channels.newChannel(new ByteBufOutputStream(buffer)); payload.writeTo(byteChannel); - buffer.rewind(); return buffer; } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java index fd7a59608a..7adbc4ced1 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketNetworkClient.java @@ -355,7 +355,9 @@ private void handleSelectorEvents(List responseInfoList) { connectionTracker.checkInConnection(connId); RequestMetadata requestMetadata = connectionIdToRequestInFlight.remove(connId); correlationIdInFlightToConnectionId.remove(requestMetadata.requestInfo.getRequest().getCorrelationId()); - responseInfoList.add(new ResponseInfo(requestMetadata.requestInfo, null, recv.getReceivedBytes().getAndRelease())); + // This would transfer the ownership of the content from BoundedNettyByteBufReceive to ResponseInfo. + // Don't use this BoundedNettyByteBufReceive anymore. + responseInfoList.add(new ResponseInfo(requestMetadata.requestInfo, null, recv.getReceivedBytes().content())); requestMetadata.onResponseReceive(); } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java index 72a473d517..5029d4192f 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java @@ -13,9 +13,10 @@ */ package com.github.ambry.network; +import com.github.ambry.utils.AbstractByteBufHolder; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.SystemTime; -import io.netty.util.ReferenceCountUtil; -import java.io.IOException; +import io.netty.buffer.ByteBuf; import java.io.InputStream; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; @@ -26,19 +27,19 @@ // The request at the network layer -class SocketServerRequest implements NetworkRequest { +class SocketServerRequest extends AbstractByteBufHolder implements NetworkRequest { private final int processor; private final String connectionId; private final InputStream input; private final long startTimeInMs; private static final Logger logger = LoggerFactory.getLogger(SocketServerRequest.class); - private Object buffer; + private ByteBuf content; - public SocketServerRequest(int processor, String connectionId, Object buffer, InputStream input) throws IOException { + public SocketServerRequest(int processor, String connectionId, ByteBuf content) { this.processor = processor; this.connectionId = connectionId; - this.buffer = buffer; - this.input = input; + this.content = content; + this.input = new NettyByteBufDataInputStream(content); this.startTimeInMs = SystemTime.getInstance().milliseconds(); logger.trace("Processor {} received request : {}", processor, connectionId); } @@ -53,14 +54,6 @@ public long getStartTimeInMs() { return startTimeInMs; } - @Override - public void release() { - if (buffer != null) { - ReferenceCountUtil.release(buffer); - buffer = null; - } - } - public int getProcessor() { return processor; } @@ -68,6 +61,16 @@ public int getProcessor() { public String getConnectionId() { return connectionId; } + + @Override + public ByteBuf content() { + return content; + } + + @Override + public SocketServerRequest replace(ByteBuf content) { + return new SocketServerRequest(getProcessor(), getConnectionId(), content); + } } // The response at the network layer @@ -114,7 +117,7 @@ public ServerNetworkResponseMetrics getMetrics() { } interface ResponseListener { - public void onResponse(int processorId); + void onResponse(int processorId); } /** diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java index 52185b19f0..5904c3572d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketServer.java @@ -20,6 +20,7 @@ import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketException; @@ -419,9 +420,8 @@ public void run() { List completedReceives = selector.completedReceives(); for (NetworkReceive networkReceive : completedReceives) { String connectionId = networkReceive.getConnectionId(); - Object buffer = networkReceive.getReceivedBytes().getAndRelease(); - SocketServerRequest req = new SocketServerRequest(id, connectionId, buffer, - Utils.createDataInputStreamFromBuffer(buffer, networkConfig.networkPutRequestShareMemory)); + ByteBuf buffer = networkReceive.getReceivedBytes().content(); + SocketServerRequest req = new SocketServerRequest(id, connectionId, buffer); channel.sendRequest(req); } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java index 5598e1a026..04ce475b15 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java @@ -16,7 +16,6 @@ import com.github.ambry.config.NetworkConfig; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.SelectionKey; @@ -76,9 +75,7 @@ public void setNetworkSend(NetworkSend networkSend) { } protected void initializeNetworkReceive() { - BoundedReceive boundedReceive = - config.networkUseNettyByteBuf ? new BoundedNettyByteBufReceive() : new BoundedByteBufferReceive(); - networkReceive = new NetworkReceive(getConnectionId(), boundedReceive, time); + networkReceive = new NetworkReceive(getConnectionId(), new BoundedNettyByteBufReceive(), time); } /** @@ -181,7 +178,7 @@ public void clearReceive() { protected void release() { if (networkReceive != null) { - ReferenceCountUtil.release(networkReceive.getReceivedBytes().getAndRelease()); + networkReceive.getReceivedBytes().release(); } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java b/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java deleted file mode 100644 index e50b3ccd42..0000000000 --- a/ambry-network/src/test/java/com.github.ambry.network/BoundedByteBufferReceiveTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import com.github.ambry.utils.ByteBufferInputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.util.Random; -import org.junit.Assert; -import org.junit.Test; - - -public class BoundedByteBufferReceiveTest { - - /** - * Test basic operation of {@link BoundedByteBufferReceive}. - * @throws Exception - */ - @Test - public void testBoundedByteBufferReceive() throws Exception { - int bufferSize = 2000; - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - buffer.putLong(bufferSize); - byte[] buf = new byte[bufferSize - Long.BYTES]; - new Random().nextBytes(buf); - buffer.put(buf); - buffer.flip(); - BoundedByteBufferReceive set = new BoundedByteBufferReceive(); - Assert.assertEquals("Wrong number of bytes read", bufferSize, - set.readFrom(Channels.newChannel(new ByteBufferInputStream(buffer)))); - buffer.clear(); - ByteBuffer payload = set.getAndRelease(); - for (int i = 8; i < bufferSize; i++) { - Assert.assertEquals(buffer.array()[i], payload.get()); - } - } -} diff --git a/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java b/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java index ad593f6fb9..a16ec62b19 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/BoundedNettyByteBufReceiveTest.java @@ -41,7 +41,7 @@ public void testBoundedByteBufferReceive() throws Exception { Assert.assertEquals("Wrong number of bytes read", bufferSize, set.readFrom(Channels.newChannel(new ByteBufferInputStream(buffer)))); buffer.clear(); - ByteBuf payload = set.getAndRelease(); + ByteBuf payload = set.content(); for (int i = 8; i < bufferSize; i++) { Assert.assertEquals(buffer.array()[i], payload.readByte()); } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java index 3120fb84c0..dac301c1da 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SSLSelectorTest.java @@ -19,13 +19,14 @@ import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.SSLConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Time; +import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -37,6 +38,7 @@ import org.conscrypt.Conscrypt; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -55,6 +57,17 @@ public class SSLSelectorTest { private Selector selector; private final File trustStoreFile; private final NetworkConfig networkConfig; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } @Parameterized.Parameters public static List data() { @@ -212,14 +225,14 @@ public void testNormalOperation() throws Exception { // handle any responses we may have gotten for (NetworkReceive receive : selector.completedReceives()) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); + ByteBuf payload = receive.getReceivedBytes().content(); String[] pieces = SelectorTest.asString(payload).split("&"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); assertEquals("Check the source", receive.getConnectionId(), pieces[0]); - assertEquals("Check that the receive has kindly been rewound", 0, payload.position()); assertTrue("Received connectionId is as expected ", connectionIds.contains(receive.getConnectionId())); assertEquals("Check the request counter", 0, Integer.parseInt(pieces[1])); responseCount++; + receive.getReceivedBytes().release(); } // prepare new sends for the next round @@ -335,8 +348,12 @@ private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L); for (NetworkReceive receive : selector.completedReceives()) { if (receive.getConnectionId().equals(connectionId)) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); - return SelectorTest.asString(payload); + ByteBuf payload = receive.getReceivedBytes().content(); + try { + return SelectorTest.asString(payload); + } finally { + payload.release(); + } } } } diff --git a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java index 013adc206a..a03a4e30b8 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SelectorTest.java @@ -16,7 +16,9 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; +import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -44,6 +46,17 @@ public class SelectorTest { private EchoServer server; private Selector selector; private int selectorExecutorPoolSize; + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } @Parameterized.Parameters public static List data() { @@ -197,15 +210,15 @@ public void testNormalOperation() throws Exception { // handle any responses we may have gotten for (NetworkReceive receive : selector.completedReceives()) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); + ByteBuf payload = receive.getReceivedBytes().content(); String[] pieces = asString(payload).split("&"); assertEquals("Should be in the form 'conn-counter'", 2, pieces.length); assertEquals("Check the source", receive.getConnectionId(), pieces[0]); - assertEquals("Check that the receive has kindly been rewound", 0, payload.position()); int index = Integer.parseInt(receive.getConnectionId().split("_")[1]); assertEquals("Check the request counter", responses[index], Integer.parseInt(pieces[1])); responses[index]++; // increment the expected counter responseCount++; + receive.getReceivedBytes().release(); } // prepare new sends for the next round @@ -247,8 +260,12 @@ private String blockingRequest(String connectionId, String s) throws Exception { selector.poll(1000L); for (NetworkReceive receive : selector.completedReceives()) { if (receive.getConnectionId() == connectionId) { - ByteBuffer payload = (ByteBuffer) (receive.getReceivedBytes().getAndRelease()); - return asString(payload); + ByteBuf payload = receive.getReceivedBytes().content(); + try { + return asString(payload); + } finally { + receive.getReceivedBytes().release(); + } } } } @@ -272,8 +289,11 @@ static NetworkSend createSend(String connectionId, String s) { return new NetworkSend(connectionId, new BoundedByteBufferSend(buf), null, SystemTime.getInstance()); } - static String asString(ByteBuffer payload) { - return new String(payload.array(), payload.arrayOffset()); + static String asString(ByteBuf payload) { + ByteBuffer buffer = ByteBuffer.allocate(payload.readableBytes()); + payload.readBytes(buffer); + buffer.flip(); + return new String(buffer.array(), buffer.arrayOffset()); } /** diff --git a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java index 1b25aa5df3..dd67d9f99c 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SocketNetworkClientTest.java @@ -25,7 +25,6 @@ import com.github.ambry.utils.Time; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -45,14 +44,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; /** * Test the {@link SocketNetworkClient} */ -@RunWith(Parameterized.class) public class SocketNetworkClientTest { private static final int CHECKOUT_TIMEOUT_MS = 1000; private static final int MAX_PORTS_PLAIN_TEXT = 3; @@ -72,14 +68,8 @@ public class SocketNetworkClientTest { private List localSslDataNodes; private MockClusterMap sslEnabledClusterMap; private MockClusterMap sslDisabledClusterMap; - private boolean usingNettyByteBuffer; private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); - @Parameterized.Parameters - public static List data() { - return Arrays.asList(new Object[][]{{false}, {true}}); - } - @Before public void before() { nettyByteBufLeakHelper.beforeTest(); @@ -106,18 +96,17 @@ public void testNetworkClientFactory() throws IOException { Assert.assertNotNull("SocketNetworkClient returned should be non-null", networkClientFactory.getNetworkClient()); } - public SocketNetworkClientTest(boolean usingNettyByteBuffer) throws IOException { - this.usingNettyByteBuffer = usingNettyByteBuffer; + public SocketNetworkClientTest() throws IOException { Properties props = new Properties(); props.setProperty(NetworkConfig.NETWORK_CLIENT_ENABLE_CONNECTION_REPLENISHMENT, "true"); - props.setProperty(NetworkConfig.NETWORK_USE_NETTY_BYTE_BUF, usingNettyByteBuffer ? "true" : "false"); VerifiableProperties vprops = new VerifiableProperties(props); NetworkConfig networkConfig = new NetworkConfig(vprops); selector = new MockSelector(networkConfig); time = new MockTime(); networkMetrics = new NetworkMetrics(new MetricRegistry()); - networkClient = new SocketNetworkClient(selector, networkConfig, networkMetrics, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, - CHECKOUT_TIMEOUT_MS, time); + networkClient = + new SocketNetworkClient(selector, networkConfig, networkMetrics, MAX_PORTS_PLAIN_TEXT, MAX_PORTS_SSL, + CHECKOUT_TIMEOUT_MS, time); sslEnabledClusterMap = new MockClusterMap(true, 9, 3, 3, false); localSslDataNodes = sslEnabledClusterMap.getDataNodeIds() .stream() @@ -136,12 +125,6 @@ public SocketNetworkClientTest(boolean usingNettyByteBuffer) throws IOException replicaOnSslNode = sslEnabledClusterMap.getReplicaIds(dataNodeId).get(0); } - @After - public void cleanup() { - // force JVM to gc to trigger more intense resource leak detection. - System.gc(); - } - /** * Test {@link SocketNetworkClient#warmUpConnections(List, int, long, List)} */ @@ -195,12 +178,11 @@ public void testBasicSendAndPoll() { for (ResponseInfo responseInfo : responseInfoList) { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); responseCount++; responseInfo.release(); @@ -218,7 +200,7 @@ public void testBasicSendAndPoll() { * Tests a failure scenario where requests remain too long in the {@link SocketNetworkClient}'s pending requests queue. */ @Test - public void testConnectionUnavailable() throws InterruptedException { + public void testConnectionUnavailable() { List requestInfoList = new ArrayList<>(); List responseInfoList; requestInfoList.add(new RequestInfo(sslHost, sslPort, new MockSend(3), replicaOnSslNode)); @@ -240,7 +222,7 @@ public void testConnectionUnavailable() throws InterruptedException { requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNotNull("Should have encountered an error", error); Assert.assertEquals("Should have received a connection unavailable error", NetworkClientErrorCode.ConnectionUnavailable, error); @@ -274,7 +256,7 @@ public void testNetworkError() { requestInfoList.clear(); for (ResponseInfo responseInfo : responseInfoList) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNotNull("Should have encountered an error", error); Assert.assertEquals("Should have received a connection unavailable error", NetworkClientErrorCode.NetworkError, error); @@ -467,17 +449,16 @@ public void testRequestDropping() { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); if (send.getCorrelationId() == 1) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); } else { Assert.assertEquals("Expected connection unavailable on dropped request", NetworkClientErrorCode.ConnectionUnavailable, responseInfo.getError()); - Assert.assertNull("Should not receive a response", responseInfo.getResponse()); + Assert.assertNull("Should not receive a response", responseInfo.content()); } } responseInfoList.forEach(ResponseInfo::release); @@ -496,18 +477,17 @@ public void testRequestDropping() { MockSend send = (MockSend) responseInfo.getRequestInfo().getRequest(); if (send.getCorrelationId() != 4) { NetworkClientErrorCode error = responseInfo.getError(); - Object response = responseInfo.getResponse(); + ByteBuf response = responseInfo.content(); Assert.assertNull("Should not have encountered an error", error); Assert.assertNotNull("Should receive a valid response", response); int correlationIdInRequest = send.getCorrelationId(); - int correlationIdInResponse = - usingNettyByteBuffer ? ((ByteBuf) response).readInt() : ((ByteBuffer) response).getInt(); + int correlationIdInResponse = response.readInt(); Assert.assertEquals("Received response for the wrong request", correlationIdInRequest, correlationIdInResponse); responseInfo.release(); } else { Assert.assertEquals("Expected network error (from closed connection for dropped request)", NetworkClientErrorCode.NetworkError, responseInfo.getError()); - Assert.assertNull("Should not receive a response", responseInfo.getResponse()); + Assert.assertNull("Should not receive a response", responseInfo.content()); responseInfo.release(); } } @@ -730,67 +710,16 @@ public String toString() { /** * A mock implementation of {@link BoundedNettyByteBufReceive} that constructs a buffer with the passed in correlation - * id and returns that buffer as part of {@link #getAndRelease()}. + * id. */ class MockBoundedNettyByteBufReceive extends BoundedNettyByteBufReceive { - private ByteBuf buf; /** * Construct a MockBoundedByteBufferReceive with the given correlation id. * @param correlationId the correlation id associated with this object. */ public MockBoundedNettyByteBufReceive(int correlationId) { - buf = ByteBufAllocator.DEFAULT.heapBuffer(16); - buf.writeInt(correlationId); - } - - /** - * Return the buffer associated with this object. - * @return the buffer associated with this object. - */ - @Override - public ByteBuf getAndRelease() { - if (buf == null) { - return null; - } else { - try { - return buf.retainedDuplicate(); - } finally { - buf.release(); - buf = null; - } - } - } -} - -/** - * A mock implementation of {@link BoundedByteBufferReceive} that constructs a buffer with the passed in correlation - * id and returns that buffer as part of {@link #getAndRelease()}. - */ -class MockBoundedByteBufferReceive extends BoundedByteBufferReceive { - private ByteBuffer buf; - - /** - * Construct a MockBoundedByteBufferReceive with the given correlation id. - * @param correlationId the correlation id associated with this object. - */ - public MockBoundedByteBufferReceive(int correlationId) { - buf = ByteBuffer.allocate(16); - buf.putInt(correlationId); - buf.rewind(); - } - - /** - * Return the buffer associated with this object. - * @return the buffer associated with this object. - */ - @Override - public ByteBuffer getAndRelease() { - try { - return buf; - } finally { - buf = null; - } + super(ByteBufAllocator.DEFAULT.heapBuffer(16).writeInt(correlationId), (long) 16); } } @@ -849,7 +778,6 @@ class MockSelector extends Selector { private boolean wakeUpCalled = false; private int connectCallCount = 0; private boolean isOpen = true; - private boolean usingNettyByteBuf; /** * Create a MockSelector @@ -858,7 +786,6 @@ class MockSelector extends Selector { MockSelector(NetworkConfig networkConfig) throws IOException { super(new NetworkMetrics(new MetricRegistry()), new MockTime(), null, networkConfig); super.close(); - this.usingNettyByteBuf = networkConfig.networkUseNettyByteBuf; } /** @@ -938,10 +865,8 @@ public void poll(long timeoutMs, List sends) throws IOException { if (state == MockSelectorState.DisconnectOnSend) { disconnected.add(send.getConnectionId()); } else if (!closedConnections.contains(send.getConnectionId())) { - BoundedReceive boundedReceive = - usingNettyByteBuf ? new MockBoundedNettyByteBufReceive(mockSend.getCorrelationId()) - : new MockBoundedByteBufferReceive(mockSend.getCorrelationId()); - receives.add(new NetworkReceive(send.getConnectionId(), boundedReceive, new MockTime())); + receives.add(new NetworkReceive(send.getConnectionId(), + new MockBoundedNettyByteBufReceive(mockSend.getCorrelationId()), new MockTime())); } } } @@ -1030,7 +955,7 @@ public void close(String conn) { receives.removeIf((receive) -> { boolean r = conn.equals(receive.getConnectionId()); if (r) { - ReferenceCountUtil.release(receive.getReceivedBytes().getAndRelease()); + receive.getReceivedBytes().release(); } return r; }); diff --git a/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java b/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java index 1748f476f3..3bab1b37df 100644 --- a/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java +++ b/ambry-network/src/test/java/com.github.ambry.network/SocketRequestResponseChannelTest.java @@ -13,17 +13,31 @@ */ package com.github.ambry.network; -import com.github.ambry.utils.ByteBufferInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.Random; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class SocketRequestResponseChannelTest { + private final NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); + + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + + @After + public void after() { + nettyByteBufLeakHelper.afterTest(); + } class ResponseListenerMock implements ResponseListener { public int call = 0; @@ -60,16 +74,18 @@ public void testSocketRequestResponseChannelTest() { SocketRequestResponseChannel channel = new SocketRequestResponseChannel(2, 10); Integer key = new Integer(5); String connectionId = "test_connectionId"; - ByteBuffer buffer = ByteBuffer.allocate(1000); - new Random().nextBytes(buffer.array()); - SocketServerRequest request = new SocketServerRequest(0, connectionId, buffer, new ByteBufferInputStream(buffer)); + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1000); + byte[] content = new byte[1000]; + new Random().nextBytes(content); + buffer.writeBytes(content); + SocketServerRequest request = new SocketServerRequest(0, connectionId, buffer); channel.sendRequest(request); request = (SocketServerRequest) channel.receiveRequest(); Assert.assertEquals(request.getProcessor(), 0); Assert.assertEquals(request.getConnectionId(), connectionId); InputStream stream = request.getInputStream(); for (int i = 0; i < 1000; i++) { - Assert.assertEquals((byte) stream.read(), buffer.array()[i]); + Assert.assertEquals((byte) stream.read(), content[i]); } request.release(); diff --git a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java index 105936ebbf..e7dcf96020 100644 --- a/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java +++ b/ambry-replication/src/main/java/com.github.ambry.replication/BlobIdTransformer.java @@ -30,7 +30,6 @@ import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.store.TransformationOutput; import com.github.ambry.store.Transformer; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Pair; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; @@ -157,7 +156,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo BlobProperties oldProperties = deserializeBlobProperties(inputStream); ByteBuffer userMetaData = deserializeUserMetadata(inputStream); BlobData blobData = deserializeBlob(inputStream); - ByteBuf blobDataBytes = blobData.getAndRelease(); + ByteBuf blobDataBytes = blobData.content(); long blobPropertiesSize = oldProperties.getBlobSize(); @@ -224,6 +223,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo } blobPropertiesSize = compositeBlobInfo.getTotalSize(); metadataContent.flip(); + blobDataBytes.release(); blobDataBytes = Unpooled.wrappedBuffer(metadataContent); blobData = new BlobData(blobData.getBlobType(), metadataContent.remaining(), blobDataBytes); } @@ -235,7 +235,7 @@ private Message newMessage(InputStream inputStream, StoreKey newKey, MessageInfo oldProperties.isEncrypted(), null); // BlobIDTransformer only exists on ambry-server and we don't enable netty on ambry server yet. So And blobData.getAndRelease - // will return an Unpooled ByteBuf, it's not not to release it. + // will return an Unpooled ByteBuf, it's not required to release it. // @todo, when enabling netty in ambry-server, release this ByteBuf. PutMessageFormatInputStream putMessageFormatInputStream = new PutMessageFormatInputStream(newKey, blobEncryptionKey, newProperties, userMetaData, diff --git a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java index 843e1f7fc1..c6c733bf66 100644 --- a/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java +++ b/ambry-rest/src/main/java/com.github.ambry.rest/RestServer.java @@ -25,7 +25,6 @@ import com.github.ambry.commons.NettyInternalMetrics; import com.github.ambry.commons.SSLFactory; import com.github.ambry.config.NettyConfig; -import com.github.ambry.config.NetworkConfig; import com.github.ambry.config.RestServerConfig; import com.github.ambry.config.VerifiableProperties; import com.github.ambry.notification.NotificationSystem; @@ -208,12 +207,7 @@ public RestServer(VerifiableProperties verifiableProperties, ClusterMap clusterM || nioServer == null) { throw new InstantiationException("Some of the server components were null"); } - NetworkConfig networkConfig = new NetworkConfig(verifiableProperties); - if (networkConfig.networkUseNettyByteBuf) { - nettyInternalMetrics = new NettyInternalMetrics(metricRegistry, new NettyConfig(verifiableProperties)); - } else { - nettyInternalMetrics = null; - } + nettyInternalMetrics = new NettyInternalMetrics(metricRegistry, new NettyConfig(verifiableProperties)); logger.trace("Instantiated RestServer"); } diff --git a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java index e9a1b633cb..12949df605 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/DeleteManager.java @@ -142,7 +142,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); DeleteResponse deleteResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - DeleteResponse::readFrom, DeleteResponse::getError, false); + DeleteResponse::readFrom, DeleteResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((DeleteRequest) routerRequestInfo.getRequest()).getCorrelationId(); DeleteOperation deleteOperation = correlationIdToDeleteOperation.remove(correlationId); diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java index 1dd29b3f08..143966805d 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetBlobOperation.java @@ -143,7 +143,6 @@ class GetBlobOperation extends GetOperation { firstChunk = new FirstGetChunk(); } - /** * Release all the {@link ByteBuf} in the map. Use {@link ConcurrentHashMap#remove(Object)} method to avoid * conflict with the release call in the chunk async callback. @@ -764,7 +763,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf if (!successfullyDeserialized) { BlobData blobData = MessageFormatRecord.deserializeBlob(payload); ByteBuffer encryptionKey = messageMetadata == null ? null : messageMetadata.getEncryptionKey(); - ByteBuf chunkBuf = blobData.getAndRelease(); + ByteBuf chunkBuf = blobData.content(); try { boolean launchedJob = maybeLaunchCryptoJob(chunkBuf, null, encryptionKey, chunkBlobId); @@ -1220,7 +1219,7 @@ void handleBody(InputStream payload, MessageMetadata messageMetadata, MessageInf if (rawMode) { if (blobData != null) { // RawMode, release blob data. - blobData.getAndRelease().release(); + blobData.release(); } // Return the raw bytes from storage if (encryptionKey != null) { @@ -1291,7 +1290,7 @@ RouterErrorCode processServerError(ServerErrorCode errorCode) { */ private void handleMetadataBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) throws IOException, MessageFormatException { - ByteBuf serializedMetadataContent = blobData.getAndRelease(); + ByteBuf serializedMetadataContent = blobData.content(); try { compositeBlobInfo = MetadataContentSerDe.deserializeMetadataContentRecord(serializedMetadataContent.nioBuffer(), blobIdFactory); @@ -1376,8 +1375,8 @@ private void initializeDataChunks() { * @param encryptionKey encryption key for the blob. Could be null for non encrypted blob. */ private void handleSimpleBlob(BlobData blobData, byte[] userMetadata, ByteBuffer encryptionKey) { - ByteBuf chunkBuf = blobData.getAndRelease(); try { + ByteBuf chunkBuf = blobData.content(); boolean rangeResolutionFailure = false; if (encryptionKey == null) { totalSize = blobData.getSize(); @@ -1401,7 +1400,7 @@ private void handleSimpleBlob(BlobData blobData, byte[] userMetadata, ByteBuffer } } } finally { - chunkBuf.release(); + blobData.release(); } } diff --git a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java index 24fd3d2737..e901652054 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/GetManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/GetManager.java @@ -217,7 +217,7 @@ void handleResponse(ResponseInfo responseInfo) { serverError = response.getPartitionResponseInfoList().get(0).getErrorCode(); } return serverError; - }, routerConfig.routerGetBlobOperationShareMemory); + }); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); GetRequest getRequest = (GetRequest) routerRequestInfo.getRequest(); GetOperation getOperation = correlationIdToGetOperation.remove(getRequest.getCorrelationId()); diff --git a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java index 7d2084523a..19cd3a65ff 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/PutManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/PutManager.java @@ -220,7 +220,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); PutResponse putResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - PutResponse::readFrom, PutResponse::getError, false); + PutResponse::readFrom, PutResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = routerRequestInfo.getRequest().getCorrelationId(); // Get the PutOperation that generated the request. diff --git a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java index fefcddfb15..cc5dcf730c 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java +++ b/ambry-router/src/main/java/com.github.ambry.router/RouterUtils.java @@ -26,6 +26,7 @@ import com.github.ambry.network.ResponseInfo; import com.github.ambry.protocol.Response; import com.github.ambry.server.ServerErrorCode; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -185,13 +186,13 @@ static void replaceOperationException(AtomicReference operationExcept */ static R extractResponseAndNotifyResponseHandler(ResponseHandler responseHandler, NonBlockingRouterMetrics routerMetrics, ResponseInfo responseInfo, Deserializer deserializer, - Function errorExtractor, boolean shareMemory) { + Function errorExtractor) { R response = null; ReplicaId replicaId = responseInfo.getRequestInfo().getReplicaId(); NetworkClientErrorCode networkClientErrorCode = responseInfo.getError(); if (networkClientErrorCode == null) { try { - DataInputStream dis = Utils.createDataInputStreamFromBuffer(responseInfo.getResponse(), shareMemory); + DataInputStream dis = new NettyByteBufDataInputStream(responseInfo.content()); response = deserializer.readFrom(dis); responseHandler.onEvent(replicaId, errorExtractor.apply(response)); } catch (Exception e) { diff --git a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java index 11090e8c24..33fd6a00a7 100644 --- a/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java +++ b/ambry-router/src/main/java/com.github.ambry.router/TtlUpdateManager.java @@ -155,7 +155,7 @@ void handleResponse(ResponseInfo responseInfo) { long startTime = time.milliseconds(); TtlUpdateResponse ttlUpdateResponse = RouterUtils.extractResponseAndNotifyResponseHandler(responseHandler, routerMetrics, responseInfo, - TtlUpdateResponse::readFrom, TtlUpdateResponse::getError, false); + TtlUpdateResponse::readFrom, TtlUpdateResponse::getError); RequestInfo routerRequestInfo = responseInfo.getRequestInfo(); int correlationId = ((TtlUpdateRequest) routerRequestInfo.getRequest()).getCorrelationId(); TtlUpdateOperation ttlUpdateOperation = correlationIdToTtlUpdateOperation.remove(correlationId); diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java index 292772182b..dcaa1ab8fd 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobInfoOperationTest.java @@ -34,6 +34,8 @@ import com.github.ambry.protocol.GetResponse; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.MockTime; +import com.github.ambry.utils.NettyByteBufDataInputStream; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; import java.io.IOException; @@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -104,6 +107,7 @@ public class GetBlobInfoOperationTest { private ReplicaId localReplica; private ReplicaId remoteReplica; private String localDcName; + private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); /** * Running for both {@link SimpleOperationTracker} and {@link AdaptiveOperationTracker}, with and without encryption @@ -168,6 +172,11 @@ networkClientFactory, new LoggingNotificationSystem(), mockClusterMap, kms, cryp } } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + @After public void after() { if (networkClient != null) { @@ -177,6 +186,7 @@ public void after() { if (cryptoJobHandler != null) { cryptoJobHandler.close(); } + nettyByteBufLeakHelper.afterTest(); } /** @@ -246,13 +256,13 @@ public void testPollAndResponseHandling() throws Exception { List responses = sendAndWaitForResponses(requestListToFill); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); - responseInfo.release(); if (op.isOperationComplete()) { break; } } + responses.forEach(ResponseInfo::release); if (testEncryption) { Assert.assertTrue("Latch should have been zeroed ", onPollLatch.await(500, TimeUnit.MILLISECONDS)); op.poll(requestRegistrationCallback); @@ -714,13 +724,13 @@ private void completeOp(GetBlobInfoOperation op) throws IOException { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); - responseInfo.release(); if (op.isOperationComplete()) { break; } } + responses.forEach(ResponseInfo::release); } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java index 54be311bd9..4c71ba6755 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/GetBlobOperationTest.java @@ -24,6 +24,7 @@ import com.github.ambry.commons.ByteBufferAsyncWritableChannel; import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.LoggingNotificationSystem; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.commons.ResponseHandler; import com.github.ambry.config.CryptoServiceConfig; @@ -120,7 +121,6 @@ public class GetBlobOperationTest { private final RouterCallback routerCallback; private final String operationTrackerType; private final boolean testEncryption; - private final boolean networkUseNetty; private MockKeyManagementService kms = null; private MockCryptoService cryptoService = null; private CryptoJobHandler cryptoJobHandler = null; @@ -181,10 +181,9 @@ public void after() { */ @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false, false}, - {SimpleOperationTracker.class.getSimpleName(), false, true}, - {AdaptiveOperationTracker.class.getSimpleName(), false, false}, - {AdaptiveOperationTracker.class.getSimpleName(), true, false}}); + return Arrays.asList(new Object[][]{{SimpleOperationTracker.class.getSimpleName(), false}, + {AdaptiveOperationTracker.class.getSimpleName(), false}, + {AdaptiveOperationTracker.class.getSimpleName(), true}}); } /** @@ -193,11 +192,9 @@ public static List data() { * @param operationTrackerType the type of {@link OperationTracker} to use. * @param testEncryption {@code true} if blobs need to be tested w/ encryption. {@code false} otherwise */ - public GetBlobOperationTest(String operationTrackerType, boolean testEncryption, boolean networkUseNetty) - throws Exception { + public GetBlobOperationTest(String operationTrackerType, boolean testEncryption) throws Exception { this.operationTrackerType = operationTrackerType; this.testEncryption = testEncryption; - this.networkUseNetty = networkUseNetty; // Defaults. Tests may override these and do new puts as appropriate. maxChunkSize = random.nextInt(1024 * 1024) + 1; // a blob size that is greater than the maxChunkSize and is not a multiple of it. Will result in a composite blob. @@ -287,7 +284,8 @@ private void doDirectPut(BlobType blobType, ByteBuffer blobContent) throws Excep new PutRequest(random.nextInt(), "clientId", blobId, blobProperties, userMetadataBuf.duplicate(), blobContent.duplicate(), blobContent.remaining(), blobType, blobEncryptionKey == null ? null : blobEncryptionKey.duplicate()); - server.send(request); + // Make sure we release the BoundedNettyByteBufReceive. + server.send(request).release(); } } @@ -407,7 +405,7 @@ public void testCompositeBlobRawMode() throws Exception { // extract chunk ids BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(new ByteArrayInputStream(payload.array()), blobIdFactory); - ByteBuf metadataBuffer = blobAll.getBlobData().getAndRelease(); + ByteBuf metadataBuffer = blobAll.getBlobData().content(); try { CompositeBlobInfo compositeBlobInfo = MetadataContentSerDe.deserializeMetadataContentRecord(metadataBuffer.nioBuffer(), blobIdFactory); @@ -574,7 +572,7 @@ public void testRequestTimeoutAndBlobNotFoundLocalTimeout() throws Exception { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); } @@ -629,7 +627,7 @@ public void testTimeoutAndBlobNotFoundInOriginDc() throws Exception { List responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse()), mockClusterMap) : null; + new NettyByteBufDataInputStream(responseInfo.content()), mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); } @@ -1434,8 +1432,7 @@ private GetBlobOperation createOperationAndComplete(Callback responses = sendAndWaitForResponses(requestRegistrationCallback.getRequestsToSend()); for (ResponseInfo responseInfo : responses) { - DataInputStream dis = Utils.createDataInputStreamFromBuffer(responseInfo.getResponse(), - routerConfig.routerGetBlobOperationShareMemory); + DataInputStream dis = new NettyByteBufDataInputStream(responseInfo.content()); GetResponse getResponse = responseInfo.getError() == null ? GetResponse.readFrom(dis, mockClusterMap) : null; op.handleResponse(responseInfo, getResponse); responseInfo.release(); @@ -1623,7 +1620,6 @@ private Properties getDefaultNonBlockingRouterProperties(boolean excludeTimeout) properties.setProperty("router.operation.tracker.exclude.timeout.enabled", Boolean.toString(excludeTimeout)); properties.setProperty("router.operation.tracker.terminate.on.not.found.enabled", "true"); properties.setProperty("router.get.blob.operation.share.memory", "true"); - properties.setProperty("network.use.netty.byte.buf", Boolean.toString(networkUseNetty)); return properties; } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java index 2e04319ce6..30a07dd6d7 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockSelector.java @@ -14,29 +14,19 @@ package com.github.ambry.router; import com.codahale.metrics.MetricRegistry; -import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.config.NetworkConfig; -import com.github.ambry.config.VerifiableProperties; -import com.github.ambry.network.BoundedByteBufferReceive; import com.github.ambry.network.BoundedNettyByteBufReceive; import com.github.ambry.network.NetworkMetrics; import com.github.ambry.network.NetworkReceive; import com.github.ambry.network.NetworkSend; import com.github.ambry.network.PortType; import com.github.ambry.network.Selector; -import com.github.ambry.utils.ByteBufferChannel; -import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Time; import java.io.IOException; -import java.io.SequenceInputStream; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; @@ -129,24 +119,9 @@ public void poll(long timeoutMs, List sends) throws IOException { disconnected.add(send.getConnectionId()); } else { MockServer server = connIdToServer.get(send.getConnectionId()); - BoundedByteBufferReceive receive = server.send(send.getPayload()); + BoundedNettyByteBufReceive receive = server.send(send.getPayload()); if (receive != null) { - if (!config.networkUseNettyByteBuf) { - receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); - } else { - // Convert a BoundedByteBufferReceive to BoundedNettyByteBufReceive - BoundedNettyByteBufReceive boundedNettyByteBufReceive = new BoundedNettyByteBufReceive(); - ByteBuffer buffer = (ByteBuffer) receive.getAndRelease(); - ByteBuffer sizeBuffer = ByteBuffer.allocate(Long.BYTES); - sizeBuffer.putLong(buffer.remaining() + Long.BYTES); - sizeBuffer.flip(); - ReadableByteChannel channel = Channels.newChannel( - new SequenceInputStream(new ByteBufferInputStream(sizeBuffer), new ByteBufferInputStream(buffer))); - while (!boundedNettyByteBufReceive.isReadComplete()) { - boundedNettyByteBufReceive.readFrom(channel); - } - receives.add(new NetworkReceive(send.getConnectionId(), boundedNettyByteBufReceive, time)); - } + receives.add(new NetworkReceive(send.getConnectionId(), receive, time)); } } } diff --git a/ambry-router/src/test/java/com.github.ambry.router/MockServer.java b/ambry-router/src/test/java/com.github.ambry.router/MockServer.java index 42cc9f4906..047618f8b7 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/MockServer.java +++ b/ambry-router/src/test/java/com.github.ambry.router/MockServer.java @@ -16,7 +16,7 @@ import com.github.ambry.account.Account; import com.github.ambry.account.Container; import com.github.ambry.clustermap.ClusterMap; -import com.github.ambry.network.BoundedByteBufferReceive; +import com.github.ambry.network.BoundedNettyByteBufReceive; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.messageformat.BlobProperties; import com.github.ambry.messageformat.BlobType; @@ -82,12 +82,12 @@ class MockServer { /** * Take in a request in the form of {@link Send} and return a response in the form of a - * {@link BoundedByteBufferReceive}. + * {@link BoundedNettyByteBufReceive}. * @param send the request. * @return the response. * @throws IOException if there was an error in interpreting the request. */ - public BoundedByteBufferReceive send(Send send) throws IOException { + public BoundedNettyByteBufReceive send(Send send) throws IOException { if (!shouldRespond) { return null; } @@ -116,9 +116,9 @@ public BoundedByteBufferReceive send(Send send) throws IOException { response.writeTo(channel); ByteBuffer payload = channel.getBuffer(); payload.flip(); - BoundedByteBufferReceive boundedByteBufferReceive = new BoundedByteBufferReceive(); - boundedByteBufferReceive.readFrom(Channels.newChannel(new ByteBufferInputStream(payload))); - return boundedByteBufferReceive; + BoundedNettyByteBufReceive receive = new BoundedNettyByteBufReceive(); + receive.readFrom(Channels.newChannel(new ByteBufferInputStream(payload))); + return receive; } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java b/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java index ff9f5bc34d..39a84b215e 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/NonBlockingRouterTest.java @@ -39,11 +39,12 @@ import com.github.ambry.protocol.RequestOrResponseType; import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.MockTime; +import com.github.ambry.utils.NettyByteBufLeakHelper; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.TestUtils; import com.github.ambry.utils.Utils; +import io.netty.buffer.ByteBuf; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -68,6 +69,7 @@ import java.util.stream.LongStream; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -120,6 +122,7 @@ public class NonBlockingRouterTest { byte[] putUserMetadata; byte[] putContent; ReadableStreamChannel putChannel; + private NettyByteBufLeakHelper nettyByteBufLeakHelper = new NettyByteBufLeakHelper(); /** * Running for both regular and encrypted blobs, and versions 2 and 3 of MetadataContent @@ -152,9 +155,15 @@ public NonBlockingRouterTest(boolean testEncryption, int metadataContentVersion) accountService = new InMemAccountService(false, true); } + @Before + public void before() { + nettyByteBufLeakHelper.beforeTest(); + } + @After public void after() { Assert.assertEquals("Current operations count should be 0", 0, NonBlockingRouter.currentOperationsCount.get()); + nettyByteBufLeakHelper.afterTest(); } /** @@ -1083,8 +1092,8 @@ private void testNoResponseNoNotification(OperationHelper opHelper, List()), null, null // make 1st request of first chunk encounter Temporarily_Disabled mockServer.setServerErrorForAllRequests(ServerErrorCode.Temporarily_Disabled); ResponseInfo responseInfo = getResponseInfo(requestInfos.get(0)); - PutResponse putResponse = responseInfo.getError() == null ? PutResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse())) : null; + PutResponse putResponse = + responseInfo.getError() == null ? PutResponse.readFrom(new NettyByteBufDataInputStream(responseInfo.content())) + : null; op.handleResponse(responseInfo, putResponse); + responseInfo.release(); PutOperation.PutChunk putChunk = op.getPutChunks().get(0); SimpleOperationTracker operationTracker = (SimpleOperationTracker) putChunk.getOperationTrackerInUse(); Assert.assertEquals("Disabled count should be 1", 1, operationTracker.getDisabledCount()); @@ -221,9 +240,11 @@ null, new RouterCallback(new MockNetworkClient(), new ArrayList<>()), null, null // make 2nd request of first chunk encounter Replica_Unavailable mockServer.setServerErrorForAllRequests(ServerErrorCode.Replica_Unavailable); responseInfo = getResponseInfo(requestInfos.get(1)); - putResponse = responseInfo.getError() == null ? PutResponse.readFrom( - Utils.createDataInputStreamFromBuffer(responseInfo.getResponse())) : null; + putResponse = + responseInfo.getError() == null ? PutResponse.readFrom(new NettyByteBufDataInputStream(responseInfo.content())) + : null; op.handleResponse(responseInfo, putResponse); + responseInfo.release(); putChunk = op.getPutChunks().get(0); Assert.assertEquals("Failure count should be 1", 1, ((SimpleOperationTracker) putChunk.getOperationTrackerInUse()).getFailedCount()); @@ -327,7 +348,7 @@ private void resetCorrelationId(byte[] request) { */ private ResponseInfo getResponseInfo(RequestInfo requestInfo) throws IOException { NetworkReceive networkReceive = new NetworkReceive(null, mockServer.send(requestInfo.getRequest()), time); - return new ResponseInfo(requestInfo, null, networkReceive.getReceivedBytes().getAndRelease()); + return new ResponseInfo(requestInfo, null, networkReceive.getReceivedBytes().content()); } } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java index dc0d8023f1..395f7d638b 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerHardDeleteTest.java @@ -439,7 +439,7 @@ void getAndVerify(ConnectedChannel channel, int blobsCount) throws Exception { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); Assert.assertEquals(properties.get(i).getBlobSize(), blobData.getSize()); byte[] dataOutput = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(dataOutput); } finally { diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java index 6bd8385594..b5de36abb6 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/ServerTestUtil.java @@ -132,7 +132,7 @@ final class ServerTestUtil { static byte[] getBlobDataAndRelease(BlobData blobData) { byte[] actualBlobData = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(actualBlobData); } finally { diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java index 5607f9916b..f468821817 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/Verifier.java @@ -199,7 +199,7 @@ public void run() { try { BlobData blobData = MessageFormatRecord.deserializeBlob(resp.getInputStream()); byte[] blobout = new byte[(int) blobData.getSize()]; - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(blobout); } finally { @@ -231,7 +231,7 @@ public void run() { BlobAll blobAll = MessageFormatRecord.deserializeBlobAll(resp.getInputStream(), new BlobIdFactory(clusterMap)); byte[] blobout = new byte[(int) blobAll.getBlobData().getSize()]; - ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + ByteBuf buffer = blobAll.getBlobData().content(); try { buffer.readBytes(blobout); } finally { diff --git a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java index 2f55dbaf44..3512c9998e 100644 --- a/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java +++ b/ambry-server/src/test/java/com.github.ambry.server/AmbryServerRequestsTest.java @@ -135,18 +135,16 @@ public class AmbryServerRequestsTest { private final ReplicationConfig replicationConfig; private final ServerConfig serverConfig; private final ReplicaStatusDelegate mockDelegate = Mockito.mock(ReplicaStatusDelegate.class); - private final boolean putRequestShareMemory; private final boolean validateRequestOnStoreState; private AmbryServerRequests ambryRequests; @Parameterized.Parameters public static List data() { - return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); + return Arrays.asList(new Object[][]{{false}, {true}}); } - public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRequestOnStoreState) + public AmbryServerRequestsTest(boolean validateRequestOnStoreState) throws IOException, ReplicationException, StoreException, InterruptedException, ReflectiveOperationException { - this.putRequestShareMemory = putRequestShareMemory; this.validateRequestOnStoreState = validateRequestOnStoreState; clusterMap = new MockClusterMap(); Properties properties = createProperties(validateRequestOnStoreState, true); @@ -858,7 +856,7 @@ public void undeleteTest() throws Exception { */ private Response sendRequestGetResponse(RequestOrResponse request, ServerErrorCode expectedServerErrorCode) throws InterruptedException, IOException { - NetworkRequest mockRequest = MockRequest.fromRequest(request, this.putRequestShareMemory); + NetworkRequest mockRequest = MockRequest.fromRequest(request); ambryRequests.handleRequests(mockRequest); assertEquals("Request accompanying response does not match original request", mockRequest, requestResponseChannel.lastOriginalRequest); @@ -1471,13 +1469,13 @@ private static class MockRequest implements NetworkRequest { * @return an instance of {@link MockRequest} that represents {@code request}. * @throws IOException */ - static MockRequest fromRequest(RequestOrResponse request, boolean shareMemory) throws IOException { + static MockRequest fromRequest(RequestOrResponse request) throws IOException { ByteBuffer buffer = ByteBuffer.allocate((int) request.sizeInBytes()); request.writeTo(new ByteBufferChannel(buffer)); buffer.flip(); // read length (to bring it to a state where AmbryRequests can handle it). buffer.getLong(); - return new MockRequest(shareMemory ? new ByteBufferDataInputStream(buffer) : new ByteBufferInputStream(buffer)); + return new MockRequest(new ByteBufferDataInputStream(buffer)); } /** diff --git a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java index 2b0028d0e5..e595e0dedb 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/HardDeleteVerifier.java @@ -29,7 +29,6 @@ import com.github.ambry.utils.CrcInputStream; import com.github.ambry.utils.Utils; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -458,7 +457,7 @@ private void verify(String dataDir) throws Exception { } if (isDeleted) { - ByteBuf byteBuf = output.getAndRelease(); + ByteBuf byteBuf = output.content(); try { if (!verifyZeroed(metadata.array()) || !verifyZeroed( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) output.getSize()], 0, @@ -478,7 +477,6 @@ private void verify(String dataDir) throws Exception { } } finally { byteBuf.release(); - byteBuf = null; } } else { unDeletedPuts++; @@ -735,7 +733,7 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr if (!caughtException) { if (isDeleted) { - ByteBuf byteBuf = blobData.getAndRelease(); + ByteBuf byteBuf = blobData.content(); try { asExpected = verifyZeroed(usermetadata.array()) && verifyZeroed( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize())); @@ -743,11 +741,10 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr asExpected = false; } finally { byteBuf.release(); - byteBuf = null; } } else { - ByteBuf byteBuf = blobData.getAndRelease(); - ByteBuf oldByteBuf = oldBlobData.getAndRelease(); + ByteBuf byteBuf = blobData.content(); + ByteBuf oldByteBuf = oldBlobData.content(); try { asExpected = Arrays.equals(usermetadata.array(), oldUsermetadata.array()) && Arrays.equals( Utils.readBytesFromByteBuf(byteBuf, new byte[(int) blobData.getSize()], 0, (int) blobData.getSize()), @@ -758,8 +755,6 @@ boolean deserializeUserMetadataAndBlob(InputStream streamlog, InputStream oldStr } finally { byteBuf.release(); oldByteBuf.release(); - byteBuf = null; - oldByteBuf = null; } } return asExpected; diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java index 712664ac2f..c8af61a3a7 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/BlobValidator.java @@ -416,7 +416,7 @@ private ServerResponse getRecordFromNode(DataNodeId dataNodeId, BlobId blobId, G ServerErrorCode errorCode = response.getFirst(); if (errorCode == ServerErrorCode.No_Error) { BlobAll blobAll = response.getSecond(); - ByteBuf buffer = blobAll.getBlobData().getAndRelease(); + ByteBuf buffer = blobAll.getBlobData().content(); byte[] blobBytes = new byte[buffer.readableBytes()]; buffer.readBytes(blobBytes); buffer.release(); diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java index bee47864da..470a6a8b5d 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/admin/ServerAdminTool.java @@ -55,6 +55,7 @@ import com.github.ambry.server.ServerErrorCode; import com.github.ambry.store.StoreKeyFactory; import com.github.ambry.tools.util.ToolUtils; +import com.github.ambry.utils.NettyByteBufDataInputStream; import com.github.ambry.utils.Pair; import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; @@ -316,7 +317,7 @@ public static void main(String[] args) throws Exception { serverAdminTool.getBlob(dataNodeId, blobId, config.getOption, clusterMap); if (bResponse.getFirst() == ServerErrorCode.No_Error) { LOGGER.info("Blob type of {} from {} is {}", blobId, dataNodeId, bResponse.getSecond().getBlobType()); - ByteBuf buffer = bResponse.getSecond().getAndRelease(); + ByteBuf buffer = bResponse.getSecond().content(); try { writeByteBufToFile(buffer, outputFileStream); } finally { @@ -656,7 +657,7 @@ public ServerErrorCode triggerCompaction(DataNodeId dataNodeId, PartitionId part new AdminRequest(AdminRequestOrResponseType.TriggerCompaction, partitionId, correlationId.incrementAndGet(), CLIENT_ID); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, adminRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -679,7 +680,7 @@ public ServerErrorCode controlRequest(DataNodeId dataNodeId, PartitionId partiti CLIENT_ID); RequestControlAdminRequest controlRequest = new RequestControlAdminRequest(toControl, enable, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -702,7 +703,7 @@ public ServerErrorCode controlReplication(DataNodeId dataNodeId, PartitionId par CLIENT_ID); ReplicationControlAdminRequest controlRequest = new ReplicationControlAdminRequest(origins, enable, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -728,7 +729,7 @@ private ServerErrorCode controlBlobStore(DataNodeId dataNodeId, PartitionId part BlobStoreControlAdminRequest controlRequest = new BlobStoreControlAdminRequest(numReplicasCaughtUpPerPartition, storeControlRequestType, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, controlRequest); - AdminResponse adminResponse = AdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + AdminResponse adminResponse = AdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return adminResponse.getError(); } @@ -756,7 +757,7 @@ public Pair isCaughtUp(DataNodeId dataNodeId, Partitio new CatchupStatusAdminRequest(acceptableLagInBytes, numReplicasCaughtUpPerPartition, adminRequest); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, catchupStatusRequest); CatchupStatusAdminResponse adminResponse = - CatchupStatusAdminResponse.readFrom(Utils.createDataInputStreamFromBuffer(response.getResponse())); + CatchupStatusAdminResponse.readFrom(new NettyByteBufDataInputStream(response.content())); response.release(); return new Pair<>(adminResponse.getError(), adminResponse.getError() == ServerErrorCode.No_Error && adminResponse.isCaughtUp()); @@ -784,7 +785,7 @@ private Pair getGetResponse(DataNodeId dataNodeId, GetRequest getRequest = new GetRequest(correlationId.incrementAndGet(), CLIENT_ID, flags, partitionRequestInfos, getOption); ResponseInfo response = sendRequestGetResponse(dataNodeId, partitionId, getRequest); - InputStream serverResponseStream = Utils.createDataInputStreamFromBuffer(response.getResponse()); + InputStream serverResponseStream = new NettyByteBufDataInputStream(response.content()); response.release(); GetResponse getResponse = GetResponse.readFrom(new DataInputStream(serverResponseStream), clusterMap); ServerErrorCode partitionErrorCode = getResponse.getPartitionResponseInfoList().get(0).getErrorCode(); diff --git a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java index e12bad1f8c..12c5aeeaf3 100644 --- a/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java +++ b/ambry-tools/src/main/java/com.github.ambry/tools/perf/ServerReadPerformance.java @@ -261,12 +261,11 @@ public void run() { long sizeRead = 0; byte[] outputBuffer = new byte[(int) blobData.getSize()]; ByteBufferOutputStream streamOut = new ByteBufferOutputStream(ByteBuffer.wrap(outputBuffer)); - ByteBuf buffer = blobData.getAndRelease(); + ByteBuf buffer = blobData.content(); try { buffer.readBytes(streamOut, (int) blobData.getSize()); } finally { buffer.release(); - buffer = null; } long latencyPerBlob = SystemTime.getInstance().nanoseconds() - startTimeGetBlob; totalTimeTaken.addAndGet(latencyPerBlob); diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java b/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java new file mode 100644 index 0000000000..615462e59e --- /dev/null +++ b/ambry-utils/src/main/java/com.github.ambry.utils/AbstractByteBufHolder.java @@ -0,0 +1,96 @@ +/** + * Copyright 2020 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package com.github.ambry.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; + + +/** + * An abstract class that implements most of the {@link ByteBufHolder} interface so the subclass of this class + * only have to provide necessary implementation of a few methods. + * @param The subclass type. + */ +public abstract class AbstractByteBufHolder implements ByteBufHolder { + + @Override + public abstract ByteBuf content(); + + @Override + public abstract T replace(ByteBuf content); + + @Override + public T copy() { + return replace(content().copy()); + } + + @Override + public T duplicate() { + return replace(content().duplicate()); + } + + @Override + public T retainedDuplicate() { + return replace(content().retainedDuplicate()); + } + + @Override + public int refCnt() { + return content().refCnt(); + } + + @Override + public T retain() { + content().retain(); + return (T) this; + } + + @Override + public T retain(int increment) { + content().retain(increment); + return (T) this; + } + + @Override + public T touch() { + if (content() != null) { + content().touch(); + } + return (T) this; + } + + @Override + public T touch(Object hint) { + if (content() != null) { + content().touch(hint); + } + return (T) this; + } + + @Override + public boolean release() { + if (content() != null) { + return content().release(); + } + return false; + } + + @Override + public boolean release(int decrement) { + if (content() != null) { + content().release(decrement); + } + return false; + } +} diff --git a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java index 21fa821ee7..63b42da6bf 100644 --- a/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java +++ b/ambry-utils/src/main/java/com.github.ambry.utils/Utils.java @@ -15,7 +15,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import java.io.BufferedReader; import java.io.DataInputStream; @@ -219,28 +218,6 @@ public static ByteBuffer readShortBuffer(DataInputStream input) throws IOExcepti return buffer; } - /** - * A helper function to return a {@link ByteBuffer} from given {@link ByteBufferDataInputStream} at the given size. - * The returned {@link ByteBuffer} will share the memory with the underlying {@link ByteBuffer} in {@link ByteBufferDataInputStream}. - * @param stream The {@link ByteBufferDataInputStream} to read {@link ByteBuffer} out. - * @param dataSize The size of {@link ByteBuffer}. - * @return The {@link ByteBuffer} - * @throws IOException Unexpected IO errors. - */ - private static ByteBuffer getByteBufferFromByteBufferDataInputStream(ByteBufferDataInputStream stream, int dataSize) - throws IOException { - ByteBuffer byteBuffer = stream.getBuffer(); - int startIndex = byteBuffer.position(); - int oldLimit = byteBuffer.limit(); - - byteBuffer.limit(startIndex + dataSize); - ByteBuffer dataBuffer = byteBuffer.slice(); - byteBuffer.limit(oldLimit); - // Change the byte buffer's position as if the data is fetched. - byteBuffer.position(startIndex + dataSize); - return dataBuffer; - } - /** * Create a {@link ByteBufferInputStream} from the {@link CrcInputStream} by sharing the underlying memory if the * crcStream is built upon a {@link ByteBufferDataInputStream}. @@ -288,10 +265,7 @@ public static ByteBuffer getByteBufferFromInputStream(InputStream stream, int da public static ByteBuffer readByteBufferFromCrcInputStream(CrcInputStream crcStream, int dataSize) throws IOException { ByteBuffer output; InputStream inputStream = crcStream.getUnderlyingInputStream(); - if (inputStream instanceof ByteBufferDataInputStream) { - output = getByteBufferFromByteBufferDataInputStream((ByteBufferDataInputStream) inputStream, dataSize); - crcStream.updateCrc(output.duplicate()); - } else if (inputStream instanceof NettyByteBufDataInputStream) { + if (inputStream instanceof NettyByteBufDataInputStream) { // getBuffer() doesn't increase the reference count on this ByteBuf. ByteBuf nettyByteBuf = ((NettyByteBufDataInputStream) inputStream).getBuffer(); // construct a java.nio.ByteBuffer to create a ByteBufferInputStream @@ -323,10 +297,6 @@ public static ByteBuf readNettyByteBufFromCrcInputStream(CrcInputStream crcStrea output = nettyByteBuf.retainedSlice(startIndex, dataSize); crcStream.updateCrc(output.nioBuffer()); nettyByteBuf.readerIndex(startIndex + dataSize); - } else if (inputStream instanceof ByteBufferDataInputStream) { - ByteBuffer buffer = getByteBufferFromByteBufferDataInputStream((ByteBufferDataInputStream) inputStream, dataSize); - crcStream.updateCrc(buffer.duplicate()); - output = Unpooled.wrappedBuffer(buffer); } else { ByteBuffer buffer = getByteBufferFromInputStream(crcStream, dataSize); output = Unpooled.wrappedBuffer(buffer); @@ -912,33 +882,6 @@ public static byte[] readBytesFromByteBuf(ByteBuf buffer, byte[] data, int offse return data; } - /** - * Create a {@link DataInputStream} from the given buffer, which has to be either a {@link ByteBuffer} or a {@link ByteBuf}. - * This is equivalent to {@link #createDataInputStreamFromBuffer(Object, boolean)}, where the {@code shareMemory} is false. - * @param buffer The buffer where we are going to create a {@link DataInputStream} from. - * @return {@link DataInputStream}. - */ - public static DataInputStream createDataInputStreamFromBuffer(Object buffer) { - return createDataInputStreamFromBuffer(buffer, false); - } - - /** - * Create a {@link DataInputStream} from the given buffer, which has to be either a {@link ByteBuffer} or a {@link ByteBuf}. - * @param buffer The buffer where we are going to create a {@link DataInputStream} from. - * @param shareMemory If true, the {@link DataInputStream} would share the memory with the given buffer. - * @return {@link DataInputStream}. - */ - public static DataInputStream createDataInputStreamFromBuffer(Object buffer, boolean shareMemory) { - if (shareMemory) { - return buffer instanceof ByteBuf ? new NettyByteBufDataInputStream((ByteBuf) buffer) - : new ByteBufferDataInputStream((ByteBuffer) buffer); - } else { - InputStream src = buffer instanceof ByteBuf ? new ByteBufInputStream((ByteBuf) buffer) - : new ByteBufferInputStream((ByteBuffer) buffer); - return new DataInputStream(src); - } - } - /** * Split the input string "data" using the delimiter and return as list of strings for the slices obtained. * This method will ignore empty segments. That is, a call like {@code splitString(",a1,,b2,c3,", ","}} will return diff --git a/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java b/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java index 64adfb52ad..48f1cd33bc 100644 --- a/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java +++ b/ambry-utils/src/test/java/com.github.ambry.utils/UtilsTest.java @@ -198,50 +198,6 @@ public void testReadBuffers() throws IOException { } } - @Test - public void testGetByteBufferInputStreamFromCrcStreamShareMemory() throws Exception { - int blobSize = 1000; - // The first 8 bytes are the size of blob, the next 1000 bytes are the blob content, the next 8 bytes are the crc - // value, and we do this twice. - int bufferSize = (Long.SIZE / Byte.SIZE + blobSize + Long.SIZE / Byte.SIZE) * 2; - byte[] firstRandomBytes = new byte[blobSize]; - byte[] secondRandomBytes = new byte[blobSize]; - new Random().nextBytes(firstRandomBytes); - new Random().nextBytes(secondRandomBytes); - - ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - ByteBufferOutputStream bbos = new ByteBufferOutputStream(buffer); - - // Fill the buffer - byte[] arrayToFill = firstRandomBytes; - while (arrayToFill != null) { - CrcOutputStream crcStream = new CrcOutputStream(bbos); - DataOutputStream dos = new DataOutputStream(crcStream); - dos.writeLong((long) blobSize); - dos.write(arrayToFill); - buffer.putLong(crcStream.getValue()); - arrayToFill = (arrayToFill == firstRandomBytes) ? secondRandomBytes : null; - } - - buffer.flip(); - byte[] expectedArray = firstRandomBytes; - while (expectedArray != null) { - CrcInputStream cis = new CrcInputStream(new ByteBufferDataInputStream(buffer)); - DataInputStream dis = new DataInputStream(cis); - long dataSize = dis.readLong(); - assertEquals(dataSize, blobSize); - ByteBufferInputStream obtained = Utils.getByteBufferInputStreamFromCrcInputStream(cis, (int) dataSize); - // Make sure these two ByteBuffers actually share the underlying memory. - assertEquals(getByteArrayFromByteBuffer(buffer), getByteArrayFromByteBuffer(obtained.getByteBuffer())); - byte[] obtainedArray = new byte[blobSize]; - obtained.read(obtainedArray); - assertArrayEquals(obtainedArray, expectedArray); - long crcRead = buffer.getLong(); - assertEquals(crcRead, cis.getValue()); - expectedArray = (expectedArray == firstRandomBytes) ? secondRandomBytes : null; - } - } - @Test public void testGetByteBufferInputStreamFromCrcStreamShareMemoryWithNettyByteBuf() throws Exception { int blobSize = 1000;