Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Miss headersAndPayload and messageIdData in MessagePubl…
Browse files Browse the repository at this point in the history
…ishContext (apache#21245)
  • Loading branch information
kecona authored Sep 27, 2023
1 parent afc9244 commit 7a3f304
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ServerError;
Expand Down Expand Up @@ -186,14 +187,15 @@ public boolean isSuccessorTo(Producer other) {
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
boolean isChunked, boolean isMarker, Position position) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position);
boolean isChunked, boolean isMarker, MessageIdData messageIdData) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, messageIdData)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, messageIdData);
}
}

public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) {
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker,
MessageIdData messageIdData) {
if (lowestSequenceId > highestSequenceId) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
Expand All @@ -202,23 +204,23 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
});
return;
}
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) {
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, messageIdData)) {
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
isMarker, position);
isMarker, messageIdData);
}
}

public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
Position position) {
if (!isShadowTopic && position != null) {
MessageIdData messageIdData) {
if (!isShadowTopic && messageIdData != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Only shadow topic supports sending messages with messageId");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (isShadowTopic && position == null) {
if (isShadowTopic && messageIdData == null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Cannot send messages to a shadow topic");
Expand Down Expand Up @@ -267,10 +269,10 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked,
boolean isMarker, Position position) {
boolean isMarker, MessageIdData messageIdData) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position);
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload,
batchSize, isChunked, System.nanoTime(), isMarker, messageIdData);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand All @@ -279,10 +281,11 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker, Position position) {
long batchSize, boolean isChunked, boolean isMarker,
MessageIdData messageIdData) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position);
highestSequenceId, msgIn, headersAndPayload, batchSize,
isChunked, System.nanoTime(), isMarker, messageIdData);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down Expand Up @@ -373,6 +376,8 @@ private static final class MessagePublishContext implements PublishContext, Runn
private long sequenceId;
private long ledgerId;
private long entryId;
private MessageIdData messageIdData;
private ByteBuf headerAndPayload;
private Rate rateIn;
private int msgSize;
private long batchSize;
Expand Down Expand Up @@ -549,43 +554,49 @@ public void run() {
recycle();
}

static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, ByteBuf headersAndPayload,
long batchSize, boolean chunked, long startTimeNs, boolean isMarker,
MessageIdData messageIdData) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.msgSize = headersAndPayload.readableBytes();
callback.batchSize = batchSize;
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.headerAndPayload = headersAndPayload;
callback.messageIdData = messageIdData;
callback.ledgerId = messageIdData == null ? -1 : messageIdData.getLedgerId();
callback.entryId = messageIdData == null ? -1 : messageIdData.getEntryId();
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
return callback;
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) {
ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs,
boolean isMarker, MessageIdData messageIdData) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
callback.highestSequenceId = highestSequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.msgSize = headersAndPayload.readableBytes();
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.headerAndPayload = headersAndPayload;
callback.messageIdData = messageIdData;
callback.ledgerId = messageIdData == null ? -1 : messageIdData.getLedgerId();
callback.entryId = messageIdData == null ? -1 : messageIdData.getEntryId();
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand All @@ -607,6 +618,16 @@ public boolean isMarkerMessage() {
return isMarker;
}

@Override
public MessageIdData getMessageIdData() {
return messageIdData;
}

@Override
public ByteBuf getHeaderAndPayload() {
return headerAndPayload;
}

private final Handle<MessagePublishContext> recyclerHandle;

private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) {
Expand All @@ -633,6 +654,8 @@ public void recycle() {
startTimeNs = -1L;
chunked = false;
isMarker = false;
messageIdData = null;
headerAndPayload = null;
if (propertyMap != null) {
propertyMap.clear();
}
Expand Down Expand Up @@ -811,7 +834,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null);
headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker, null);
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1752,17 +1752,15 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
return;
}

// This position is only used for shadow replicator
Position position = send.hasMessageId()
? PositionImpl.get(send.getMessageId().getLedgerId(), send.getMessageId().getEntryId()) : null;
MessageIdData messageIdData = send.hasMessageId() ? send.getMessageId() : null;

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData);
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker(), position);
send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -127,6 +128,14 @@ default long getEntryTimestamp() {
default void setEntryTimestamp(long entryTimestamp) {

}

default MessageIdData getMessageIdData() {
return null;
}

default ByteBuf getHeaderAndPayload() {
return null;
}
}

CompletableFuture<Void> initialize();
Expand Down

0 comments on commit 7a3f304

Please sign in to comment.