From 7a3f304d6b6f658ef825b6464f192b42e4833a26 Mon Sep 17 00:00:00 2001 From: kecona <145982269+kecona@users.noreply.github.com> Date: Wed, 27 Sep 2023 11:45:25 +0800 Subject: [PATCH] [fix][broker] Miss headersAndPayload and messageIdData in MessagePublishContext (#21245) --- .../pulsar/broker/service/Producer.java | 73 ++++++++++++------- .../pulsar/broker/service/ServerCnx.java | 8 +- .../apache/pulsar/broker/service/Topic.java | 9 +++ 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index f7d2bb2dd2797..70cca8b821286 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; +import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ServerError; @@ -186,14 +187,15 @@ public boolean isSuccessorTo(Producer other) { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked, boolean isMarker, Position position) { - if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); + boolean isChunked, boolean isMarker, MessageIdData messageIdData) { + if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, messageIdData)) { + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, messageIdData); } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, + MessageIdData messageIdData) { if (lowestSequenceId > highestSequenceId) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, @@ -202,15 +204,15 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS }); return; } - if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) { + if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, messageIdData)) { publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, - isMarker, position); + isMarker, messageIdData); } } public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - Position position) { - if (!isShadowTopic && position != null) { + MessageIdData messageIdData) { + if (!isShadowTopic && messageIdData != null) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, "Only shadow topic supports sending messages with messageId"); @@ -218,7 +220,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he }); return false; } - if (isShadowTopic && position == null) { + if (isShadowTopic && messageIdData == null) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, "Cannot send messages to a shadow topic"); @@ -267,10 +269,10 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, - boolean isMarker, Position position) { + boolean isMarker, MessageIdData messageIdData) { MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker, position); + MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload, + batchSize, isChunked, System.nanoTime(), isMarker, messageIdData); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -279,10 +281,11 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - long batchSize, boolean isChunked, boolean isMarker, Position position) { + long batchSize, boolean isChunked, boolean isMarker, + MessageIdData messageIdData) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker, position); + highestSequenceId, msgIn, headersAndPayload, batchSize, + isChunked, System.nanoTime(), isMarker, messageIdData); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); @@ -373,6 +376,8 @@ private static final class MessagePublishContext implements PublishContext, Runn private long sequenceId; private long ledgerId; private long entryId; + private MessageIdData messageIdData; + private ByteBuf headerAndPayload; private Rate rateIn; private int msgSize; private long batchSize; @@ -549,21 +554,24 @@ public void run() { recycle(); } - static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, ByteBuf headersAndPayload, + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, + MessageIdData messageIdData) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.chunked = chunked; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; - callback.ledgerId = position == null ? -1 : position.getLedgerId(); - callback.entryId = position == null ? -1 : position.getEntryId(); + callback.headerAndPayload = headersAndPayload; + callback.messageIdData = messageIdData; + callback.ledgerId = messageIdData == null ? -1 : messageIdData.getLedgerId(); + callback.entryId = messageIdData == null ? -1 : messageIdData.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -571,21 +579,24 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + ByteBuf headersAndPayload, long batchSize, boolean chunked, long startTimeNs, + boolean isMarker, MessageIdData messageIdData) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; callback.highestSequenceId = highestSequenceId; callback.rateIn = rateIn; - callback.msgSize = msgSize; + callback.msgSize = headersAndPayload.readableBytes(); callback.batchSize = batchSize; callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; - callback.ledgerId = position == null ? -1 : position.getLedgerId(); - callback.entryId = position == null ? -1 : position.getEntryId(); + callback.headerAndPayload = headersAndPayload; + callback.messageIdData = messageIdData; + callback.ledgerId = messageIdData == null ? -1 : messageIdData.getLedgerId(); + callback.entryId = messageIdData == null ? -1 : messageIdData.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -607,6 +618,16 @@ public boolean isMarkerMessage() { return isMarker; } + @Override + public MessageIdData getMessageIdData() { + return messageIdData; + } + + @Override + public ByteBuf getHeaderAndPayload() { + return headerAndPayload; + } + private final Handle recyclerHandle; private MessagePublishContext(Handle recyclerHandle) { @@ -633,6 +654,8 @@ public void recycle() { startTimeNs = -1L; chunked = false; isMarker = false; + messageIdData = null; + headerAndPayload = null; if (propertyMap != null) { propertyMap.clear(); } @@ -811,7 +834,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon } MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); + headersAndPayload, batchSize, isChunked, System.nanoTime(), isMarker, null); if (brokerInterceptor != null) { brokerInterceptor .onMessagePublish(this, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0517fff0f03f5..d904a0547a586 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1752,17 +1752,15 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { return; } - // This position is only used for shadow replicator - Position position = send.hasMessageId() - ? PositionImpl.get(send.getMessageId().getLedgerId(), send.getMessageId().getEntryId()) : null; + MessageIdData messageIdData = send.hasMessageId() ? send.getMessageId() : null; // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), - headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); + headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, - send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); + send.getNumMessages(), send.isIsChunk(), send.isMarker(), messageIdData); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7657d77e1299f..d38329c95169f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -127,6 +128,14 @@ default long getEntryTimestamp() { default void setEntryTimestamp(long entryTimestamp) { } + + default MessageIdData getMessageIdData() { + return null; + } + + default ByteBuf getHeaderAndPayload() { + return null; + } } CompletableFuture initialize();