diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a375ebf2809ed..5637054e3fd6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -30,6 +30,7 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -144,9 +145,13 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.CompressionType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; @@ -3583,49 +3588,127 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } - @Override - public CompletableFuture getLastMessageId() { - CompletableFuture completableFuture = new CompletableFuture<>(); - PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry(); - String name = getName(); - int partitionIndex = TopicName.getPartitionIndex(name); - if (log.isDebugEnabled()) { - log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position); + private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { + int batchSize = metadata.getNumMessagesInBatch(); + if (batchSize <= 1){ + return -1; } - if (position.getEntryId() == -1) { - completableFuture - .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex)); - return completableFuture; + if (metadata.hasCompression()) { + var tmp = payload; + CompressionType compressionType = metadata.getCompression(); + CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType); + int uncompressedSize = metadata.getUncompressedSize(); + payload = codec.decode(payload, uncompressedSize); + tmp.release(); } - ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; - if (!ledgerImpl.ledgerExists(position.getLedgerId())) { - completableFuture - .complete(MessageId.earliest); - return completableFuture; + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + int lastBatchIndexInBatch = -1; + for (int i = 0; i < batchSize; i++) { + ByteBuf singleMessagePayload = + Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize); + singleMessagePayload.release(); + if (singleMessageMetadata.isCompactedOut()) { + continue; + } + lastBatchIndexInBatch = i; } - ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryComplete(Entry entry, Object ctx) { + return lastBatchIndexInBatch; + } + + private CompletableFuture getLastMessageIdFromCompactionService(int partitionIndex) { + CompletableFuture completableFuture = new CompletableFuture<>(); + getTopicCompactionService().readLastCompactedEntry().thenAccept(entry -> { + if (entry == null) { + completableFuture.complete(MessageId.earliest); + } else { try { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - if (metadata.hasNumMessagesInBatch()) { - completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(), - partitionIndex, metadata.getNumMessagesInBatch() - 1)); + int lastBatchIndexInBatch = calculateTheLastBatchIndexInBatch(metadata, entry.getDataBuffer()); + if (lastBatchIndexInBatch != -1) { + completableFuture.complete(new BatchMessageIdImpl(entry.getLedgerId(), entry.getEntryId(), + partitionIndex, lastBatchIndexInBatch)); } else { - completableFuture - .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), - partitionIndex)); + completableFuture.complete(new MessageIdImpl(entry.getLedgerId(), entry.getEntryId(), + partitionIndex)); } + } catch (Exception e) { + completableFuture.completeExceptionally(e); } finally { entry.release(); } } + }).exceptionally(ex -> { + completableFuture.completeExceptionally(ex); + return null; + }); + return completableFuture; + } - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - completableFuture.completeExceptionally(exception); + @Override + public CompletableFuture getLastMessageId() { + CompletableFuture completableFuture = new CompletableFuture<>(); + checkIfTransactionBufferRecoverCompletely(true).thenRun(()->{ + PositionImpl lastPosition = getMaxReadPosition(); + String name = getName(); + int partitionIndex = TopicName.getPartitionIndex(name); + if (log.isDebugEnabled()) { + log.debug("getLastMessageId {}, partitionIndex{}, lastPosition {}", name, partitionIndex, lastPosition); } - }, null); + CompletableFuture compactionHorizonFuture = + getTopicCompactionService().getLastCompactedPosition(); + compactionHorizonFuture.thenAccept(compactionHorizon -> { + if (lastPosition.getEntryId() == -1 || (compactionHorizon != null + && lastPosition.compareTo((PositionImpl) compactionHorizon) <= 0)) { + getLastMessageIdFromCompactionService(partitionIndex).thenAccept(messageId -> { + completableFuture.complete(messageId); + }).exceptionally(ex -> { + completableFuture.completeExceptionally(ex); + return null; + }); + return; + } + ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; + ledgerImpl.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + try { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + if (metadata.hasNumMessagesInBatch()) { + completableFuture.complete(new BatchMessageIdImpl(lastPosition.getLedgerId(), + lastPosition.getEntryId(), partitionIndex, + metadata.getNumMessagesInBatch() - 1)); + } else { + completableFuture.complete(new MessageIdImpl(lastPosition.getLedgerId(), + lastPosition.getEntryId(), partitionIndex)); + } + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + if (exception instanceof ManagedLedgerException.NonRecoverableLedgerException) { + getLastMessageIdFromCompactionService(partitionIndex).thenAccept(messageId -> { + completableFuture.complete(messageId); + }).exceptionally(ex -> { + completableFuture.completeExceptionally(ex); + return null; + }); + } + completableFuture.completeExceptionally(exception); + } + }, null); + }).exceptionally(ex -> { + log.error("Failed to get compactionHorizon.", ex); + completableFuture.completeExceptionally(ex); + return null; + }); + }).exceptionally(ex -> { + log.error("Failed to checkIfTransactionBufferRecoverCompletely.", ex); + completableFuture.completeExceptionally(ex); + return null; + }); return completableFuture; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 317b1a227e585..38d94e6bb3997 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -264,6 +264,39 @@ public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exc admin.topics().delete(topicName, false); } + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdAfterCompactionAndExpire(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + Producer producer = createProducer(enabledBatch, topicName); + // produce messages + producer.newMessage().key("k0").value("v0").sendAsync(); + producer.newMessage().key("k0").value("v1").sendAsync(); + producer.flush(); + + // trigger compaction + triggerCompactionAndWait(topicName); + + triggerLedgerSwitch(topicName); + clearAllTheLedgersOutdated(topicName); + + MessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(topicName); + Consumer consumer = createConsumer(topicName, "sub"); + MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(messageId.getLedgerId(), lastMessageIdByTopic.getLedgerId()); + assertEquals(messageId.getEntryId(), lastMessageIdByTopic.getEntryId()); + if (enabledBatch) { + BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdByTopic; + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize()); + assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex()); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + @Test(dataProvider = "enabledBatch") public void testGetLastMessageIdAfterCompactionWithCompression(boolean enabledBatch) throws Exception { String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp");