Skip to content

Commit

Permalink
Fix allocate size by add 9 bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Aug 26, 2024
1 parent 9fd8ae0 commit 4f97b2d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,17 @@ public int getLength() {
return length;
}

// calculate the data size for this block in memory including metadata which are
// partitionId, blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are partitionId,
* blockId, crc, taskAttemptId, uncompressLength and data length. This should be consistent with
* {@link ShufflePartitionedBlock#getSize()}.
*
* @return the encoded size of this object in memory
*/
public int getSize() {
return length + 3 * 8 + 2 * 4;
// FIXME(maobaolong): The size is calculated based on the Protobuf message ShuffleBlock.
// If Netty's custom serialization is used, the calculation logic here needs to be modified.
return length + 3 * Long.BYTES + 2 * Integer.BYTES;
}

public long getCrc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ public ShufflePartitionedBlock(
this.data = data;
}

// calculate the data size for this block in memory including metadata which are
// blockId, crc, taskAttemptId, length, uncompressLength
/**
* Calculate the data size for this block in memory including metadata which are partitionId,
* blockId, crc, taskAttemptId, uncompressLength and data length. This should be consistent with
* {@link ShuffleBlockInfo#getSize()}.
*
* @return the encoded size of this object in memory
*/
public long getSize() {
return length + 3 * 8 + 2 * 4;
// FIXME(maobaolong): The size is calculated based on the Protobuf message ShuffleBlock.
// If Netty's custom serialization is used, the calculation logic here needs to be modified.
return length + 3 * Long.BYTES + 2 * Integer.BYTES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public final class MessageEncoder extends MessageToMessageEncoder<Message> {
private static final Logger logger = LoggerFactory.getLogger(MessageEncoder.class);

public static final MessageEncoder INSTANCE = new MessageEncoder();
public static final int MESSAGE_HEADER_SIZE =
// Inner message encodedLength + TYPE_ENCODED_LENGTH + bodyLength
Integer.BYTES + Message.TYPE_ENCODED_LENGTH + Integer.BYTES;

private MessageEncoder() {}

Expand Down Expand Up @@ -79,7 +82,7 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro

Message.Type msgType = in.type();
// message size, message type size, body size, message encoded length
int headerLength = Integer.BYTES + msgType.encodedLength() + Integer.BYTES + in.encodedLength();
int headerLength = MESSAGE_HEADER_SIZE + in.encodedLength();
ByteBuf header = ctx.alloc().heapBuffer(headerLength);
header.writeInt(in.encodedLength());
msgType.encode(header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;

public abstract class Message implements Encodable {

public static final byte TYPE_ENCODED_LENGTH = 1;
private ManagedBuffer body;

protected Message() {
Expand Down Expand Up @@ -79,7 +79,7 @@ public byte id() {

@Override
public int encodedLength() {
return 1;
return TYPE_ENCODED_LENGTH;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.uniffle.common.exception.NotRetryException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.netty.MessageEncoder;
import org.apache.uniffle.common.netty.client.TransportClient;
import org.apache.uniffle.common.netty.client.TransportClientFactory;
import org.apache.uniffle.common.netty.client.TransportConf;
Expand Down Expand Up @@ -170,7 +171,14 @@ public RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest requ
0L,
stb.getValue(),
System.currentTimeMillis());
int allocateSize = size + sendShuffleDataRequest.encodedLength();
// allocateSize = MESSAGE_HEADER_SIZE + requestMessage encodedLength + data size
// {@link org.apache.uniffle.common.netty.MessageEncoder#encode}
// We calculated the size again, even though sendShuffleDataRequest.encodedLength()
// already included the data size, because there is a brief moment when decoding
// sendShuffleDataRequest at the shuffle server, where there are two copies of data
// in direct memory.
int allocateSize =
MessageEncoder.MESSAGE_HEADER_SIZE + sendShuffleDataRequest.encodedLength() + size;
int finalBlockNum = blockNum;
try {
RetryUtils.retryWithCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.netty.MessageEncoder;
import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.netty.client.TransportClient;
Expand Down Expand Up @@ -135,8 +136,9 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
PreAllocatedBufferInfo info =
shuffleTaskManager.getAndRemovePreAllocatedBuffer(requireBufferId);
int requireSize = info == null ? 0 : info.getRequireSize();
int encodedLength = req.encodedLength() + MessageEncoder.MESSAGE_HEADER_SIZE;
int requireBlocksSize =
requireSize - req.encodedLength() < 0 ? 0 : requireSize - req.encodedLength();
requireSize - encodedLength < 0 ? 0 : requireSize - encodedLength;

boolean isPreAllocated = info != null;

Expand Down

0 comments on commit 4f97b2d

Please sign in to comment.