Skip to content

Commit

Permalink
Fix handle of ByteBuf with multi nioBuffer in EpollDatagramChannel an…
Browse files Browse the repository at this point in the history
…d KQueueDatagramChannel

Motivation:
1. special handling of ByteBuf with multi nioBuffer rather than type of CompositeByteBuf (eg. DuplicatedByteBuf with CompositeByteBuf)
2. EpollDatagramUnicastTest and KQueueDatagramUnicastTest passed because CompositeByteBuf is converted to DuplicatedByteBuf before write to channel
3. uninitalized struct msghdr will raise error

Modifications:
1. isBufferCopyNeededForWrite(like isSingleDirectBuffer in NioDatgramChannel) checks wether a new direct buffer is needed
2. special handling of ByteBuf with multi nioBuffer in EpollDatagramChannel, AbstractEpollStreamChannel, KQueueDatagramChannel, AbstractKQueueStreamChannel and IovArray
3. initalize struct msghdr

Result:
handle of ByteBuf with multi nioBuffer in EpollDatagramChannel and KQueueDatagramChannel are ok
  • Loading branch information
louxiu authored and normanmaurer committed May 26, 2017
1 parent b419bd1 commit 3c4dfed
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
public class DatagramUnicastTest extends AbstractDatagramTest {

private static final byte[] BYTES = {0, 1, 2, 3};
private enum WrapType {
NONE, DUP, SLICE,
}

@Test
public void testSimpleSendDirectByteBuf() throws Throwable {
run();
}

public void testSimpleSendDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 1);
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), true, BYTES, 4);
}

@Test
Expand All @@ -52,8 +56,8 @@ public void testSimpleSendHeapByteBuf() throws Throwable {
}

public void testSimpleSendHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
testSimpleSend0(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 1);
testSimpleSend(sb, cb, Unpooled.buffer().writeBytes(BYTES), true, BYTES, 4);
}

@Test
Expand All @@ -65,12 +69,12 @@ public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) thr
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
testSimpleSend(sb, cb, buf, true, BYTES, 1);

CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
}

@Test
Expand All @@ -82,12 +86,12 @@ public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throw
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
testSimpleSend(sb, cb, buf, true, BYTES, 1);

CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
}

@Test
Expand All @@ -99,12 +103,12 @@ public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) thro
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf, true, BYTES, 1);
testSimpleSend(sb, cb, buf, true, BYTES, 1);

CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
testSimpleSend(sb, cb, buf2, true, BYTES, 4);
}

@Test
Expand All @@ -113,13 +117,21 @@ public void testSimpleSendWithoutBind() throws Throwable {
}

public void testSimpleSendWithoutBind(Bootstrap sb, Bootstrap cb) throws Throwable {
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
testSimpleSend0(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 1);
testSimpleSend(sb, cb, Unpooled.directBuffer().writeBytes(BYTES), false, BYTES, 4);
}

private void testSimpleSend(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
final byte[] bytes, int count) throws Throwable {
for (WrapType type: WrapType.values()) {
testSimpleSend0(sb, cb, buf.retain(), bindClient, bytes, count, type);
}
assertTrue(buf.release());
}

@SuppressWarnings("deprecation")
private void testSimpleSend0(Bootstrap sb, Bootstrap cb, ByteBuf buf, boolean bindClient,
final byte[] bytes, int count)
final byte[] bytes, int count, WrapType wrapType)
throws Throwable {
final CountDownLatch latch = new CountDownLatch(count);

Expand Down Expand Up @@ -177,7 +189,15 @@ public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exceptio
}

for (int i = 0; i < count; i++) {
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
if (wrapType == WrapType.DUP) {
cc.write(new DatagramPacket(buf.retain().duplicate(), addr));
} else if (wrapType == WrapType.SLICE) {
cc.write(new DatagramPacket(buf.retain().slice(), addr));
} else if (wrapType == WrapType.NONE) {
cc.write(new DatagramPacket(buf.retain(), addr));
} else {
throw new Exception("unknown wrap type: " + wrapType);
}
}
// release as we used buf.retain() before
buf.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
Expand All @@ -35,6 +34,7 @@
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.SocketWritableByteChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil;
Expand All @@ -53,7 +53,6 @@
import java.util.concurrent.TimeUnit;

import static io.netty.channel.unix.FileDescriptor.pipe;
import static io.netty.channel.unix.Limits.IOV_MAX;
import static io.netty.util.internal.ObjectUtil.checkNotNull;

public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
Expand Down Expand Up @@ -539,24 +538,7 @@ private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) th
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
}

if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
Expand All @@ -30,6 +29,7 @@
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.unix.DatagramSocketAddress;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;

Expand All @@ -45,7 +45,6 @@
import java.util.List;

import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
import static io.netty.channel.unix.Limits.IOV_MAX;

/**
* {@link DatagramChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
Expand Down Expand Up @@ -373,7 +372,7 @@ private boolean doWriteMessage(Object msg) throws Exception {
long memoryAddress = data.memoryAddress();
writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else if (data instanceof CompositeByteBuf) {
} else if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
array.add(data);
int cnt = array.count();
Expand All @@ -395,43 +394,13 @@ protected Object filterOutboundMessage(Object msg) {
if (msg instanceof DatagramPacket) {
DatagramPacket packet = (DatagramPacket) msg;
ByteBuf content = packet.content();
if (content.hasMemoryAddress()) {
return msg;
}

if (content.isDirect() && content instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) content;
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
return msg;
}
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DatagramPacket(newDirectBuffer(packet, content), packet.recipient());
return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
}

if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}

if (msg instanceof AddressedEnvelope) {
Expand All @@ -441,21 +410,9 @@ protected Object filterOutboundMessage(Object msg) {
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {

ByteBuf content = (ByteBuf) e.content();
if (content.hasMemoryAddress()) {
return e;
}
if (content instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) content;
if (comp.isDirect() && comp.nioBufferCount() <= IOV_MAX) {
return e;
}
}
// We can only handle direct buffers so we need to copy if a non direct is
// passed to write.
return new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient());
return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
Expand All @@ -33,6 +32,7 @@
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.unix.IovArray;
import io.netty.channel.unix.SocketWritableByteChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.ThrowableUtil;
Expand All @@ -48,8 +48,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static io.netty.channel.unix.Limits.IOV_MAX;

@UnstableApi
public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
Expand Down Expand Up @@ -369,24 +367,7 @@ private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) th
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.hasMemoryAddress() && (PlatformDependent.hasUnsafe() || !buf.isDirect())) {
if (buf instanceof CompositeByteBuf) {
// Special handling of CompositeByteBuf to reduce memory copies if some of the Components
// in the CompositeByteBuf are backed by a memoryAddress.
CompositeByteBuf comp = (CompositeByteBuf) buf;
if (!comp.isDirect() || comp.nioBufferCount() > IOV_MAX) {
// more then 1024 buffers for gathering writes so just do a memory copy.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
} else {
// We can only handle buffers with memory address so we need to copy if a non direct is
// passed to write.
buf = newDirectBuffer(buf);
assert buf.hasMemoryAddress();
}
}
return buf;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}

if (msg instanceof FileRegion) {
Expand Down
Loading

0 comments on commit 3c4dfed

Please sign in to comment.