Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR] fix(client): Fix allocate size by add 9 bytes #1940

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,17 @@ public int getLength() {
return length;
}

// calculate the data size for this block in memory including metadata which are
// partitionId, blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are partitionId,
* blockId, crc, taskAttemptId, uncompressLength and data length. This should be consistent with
* {@link ShufflePartitionedBlock#getSize()}.
*
* @return the encoded size of this object in memory
*/
public int getSize() {
return length + 3 * 8 + 2 * 4;
// FIXME(maobaolong): The size is calculated based on the Protobuf message ShuffleBlock.
// If Netty's custom serialization is used, the calculation logic here needs to be modified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean netty still use the protobuf de/serialization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, Netty need only data size is enough, but for now server release usedmemory contains the block encoded size

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, Netty need only data size is enough

If so, why not removing the unused placeholder bits directly? Just for unified logic for grpc and netty?

return length + 3 * Long.BYTES + 2 * Integer.BYTES;
}

public long getCrc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ public ShufflePartitionedBlock(
this.data = data;
}

// calculate the data size for this block in memory including metadata which are
// blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are partitionId,
* blockId, crc, taskAttemptId, uncompressLength and data length. This should be consistent with
* {@link ShuffleBlockInfo#getSize()}.
*
* @return the encoded size of this object in memory
*/
public long getSize() {
return length + 3 * 8 + 2 * 4;
// FIXME(maobaolong): The size is calculated based on the Protobuf message ShuffleBlock.
// If Netty's custom serialization is used, the calculation logic here needs to be modified.
return length + 3 * Long.BYTES + 2 * Integer.BYTES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public final class MessageEncoder extends MessageToMessageEncoder<Message> {
private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class);

public static final MessageEncoder INSTANCE = new MessageEncoder();
public static final int MESSAGE_HEADER_SIZE =
// Inner message encodedLength + TYPE_ENCODED_LENGTH + bodyLength
Integer.BYTES + Message.TYPE_ENCODED_LENGTH + Integer.BYTES;

private MessageEncoder() {}

Expand Down Expand Up @@ -79,7 +82,7 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro

Message.Type msgType = in.type();
// message size, message type size, body size, message encoded length
int headerLength = Integer.BYTES + msgType.encodedLength() + Integer.BYTES + in.encodedLength();
int headerLength = MESSAGE_HEADER_SIZE + in.encodedLength();
ByteBuf header = ctx.alloc().heapBuffer(headerLength);
header.writeInt(in.encodedLength());
msgType.encode(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;

public abstract class Message implements Encodable {

public static final byte TYPE_ENCODED_LENGTH = 1;
private ManagedBuffer body;

protected Message() {
Expand Down Expand Up @@ -79,7 +79,7 @@ public byte id() {

@Override
public int encodedLength() {
return 1;
return TYPE_ENCODED_LENGTH;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.netty.MessageEncoder;
import org.apache.uniffle.common.netty.client.TransportClient;
import org.apache.uniffle.common.netty.client.TransportClientFactory;
import org.apache.uniffle.common.netty.client.TransportConf;
Expand Down Expand Up @@ -170,7 +171,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
0L,
stb.getValue(),
System.currentTimeMillis());
int allocateSize = size + sendShuffleDataRequest.encodedLength();
// allocateSize = MESSAGE_HEADER_SIZE + requestMessage encodedLength + data size
// {@link org.apache.uniffle.common.netty.MessageEncoder#encode}
// We calculated the size again, even though sendShuffleDataRequest.encodedLength()
// already included the data size, because there is a brief moment when decoding
// sendShuffleDataRequest at the shuffle server, where there are two copies of data
// in direct memory.
int allocateSize =
MessageEncoder.MESSAGE_HEADER_SIZE + sendShuffleDataRequest.encodedLength() + size;
int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.netty.MessageEncoder;
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.netty.client.TransportClient;
Expand Down Expand Up @@ -135,8 +136,8 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
int requireSize = info == null ? 0 : info.getRequireSize();
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
int encodedLength = req.encodedLength() + MessageEncoder.MESSAGE_HEADER_SIZE;
int requireBlocksSize = requireSize - encodedLength < 0 ? 0 : requireSize - encodedLength;

boolean isPreAllocated = info != null;

Expand Down
Loading