Skip to content

Commit

Permalink
fix unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
bxfjb committed Sep 27, 2024
1 parent 5cb60b7 commit dd99891
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
EncodeResult encodeResult;

String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
ByteBuffer preEncodeBuffer = msg.getEncodedBuff();
final boolean isMultiDispatchMsg = CommitLog.isMultiDispatchMsg(this.defaultMessageStore.getMessageStoreConfig(), msg);
topicQueueLock.lock(topicQueueKey);
try {
Expand All @@ -585,15 +584,15 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
long queueOffset;
if (isMultiDispatchMsg) {
AppendMessageResult appendMessageResult = MultiDispatchUtils.handlePropertiesForLmqMsg(
preEncodeBuffer, msg, this.multiDispatch, this.getMessageStore().getMessageStoreConfig().getMaxMessageSize());
encodeResult.data, msg, this.multiDispatch, this.getMessageStore().getMessageStoreConfig());
if (appendMessageResult != null) {
PutMessageStatus putMessageStatus = PutMessageStatus.MESSAGE_ILLEGAL;
return CompletableFuture.completedFuture(new PutMessageResult(putMessageStatus, appendMessageResult));
}
}
final int msgLen = preEncodeBuffer.getInt(0);
preEncodeBuffer.position(0);
preEncodeBuffer.limit(msgLen);
final int msgLen = encodeResult.data.getInt(0);
encodeResult.data.position(0);
encodeResult.data.limit(msgLen);

try {
beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MultiDispatch;
import org.apache.rocketmq.store.config.MessageStoreConfig;
Expand Down Expand Up @@ -70,19 +71,25 @@ public static boolean checkMultiDispatchQueue(MessageStoreConfig messageStoreCon
}

public static AppendMessageResult handlePropertiesForLmqMsg(
ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner, MultiDispatch multiDispatch, int messageMaxSize) {
ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner, MultiDispatch multiDispatch, MessageStoreConfig messageStoreConfig) {
if (msgInner.isEncodeCompleted()) {
return null;
}

int crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() ? CommitLog.CRC32_RESERVED_LEN : 0;

multiDispatch.wrapMultiDispatch(msgInner);

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
boolean needAppendLastPropertySeparator = messageStoreConfig.isEnabledAppendPropCRC() && propertiesData != null && propertiesData.length > 0
&& propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;

final int propertiesLength =
(propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;

if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
Expand All @@ -94,8 +101,8 @@ public static AppendMessageResult handlePropertiesForLmqMsg(
int msgLen = msgLenWithoutProperties + 2 + propertiesLength;

// Exceeds the maximum message
if (msgLen > messageMaxSize) {
log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + messageMaxSize);
if (msgLen > messageStoreConfig.getMaxMessageSize()) {
log.warn("message size exceeded, msg total size: " + msgLen + ", maxMessageSize: " + messageStoreConfig.getMaxMessageSize());
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

Expand All @@ -106,6 +113,16 @@ public static AppendMessageResult handlePropertiesForLmqMsg(

preEncodeBuffer.putShort((short) propertiesLength);

if (propertiesLength > crc32ReservedLength) {
preEncodeBuffer.put(propertiesData);
}

if (needAppendLastPropertySeparator) {
preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
}
// 18 CRC32
preEncodeBuffer.position(preEncodeBuffer.position() + crc32ReservedLength);

msgInner.setEncodeCompleted(true);

return null;
Expand Down

0 comments on commit dd99891

Please sign in to comment.