diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8555753d98b97..0b9a9c3e9fc94 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor { position.ackSet = null; return position; }; - private final RangeSetWrapper individualDeletedMessages; + protected final RangeSetWrapper individualDeletedMessages; // Maintain the deletion status for batch messages // (ledgerId, entryId) -> deletion indexes - private final ConcurrentSkipListMap batchDeletedIndexes; + protected final ConcurrentSkipListMap batchDeletedIndexes; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private RateLimiter markDeleteLimiter; @@ -3622,4 +3622,29 @@ public boolean isCacheReadEntry() { public ManagedLedgerConfig getConfig() { return config; } + + /*** + * Create a non-durable cursor and copy the ack stats. + */ + public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException { + NonDurableCursorImpl newNonDurableCursor = + (NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName); + if (individualDeletedMessages != null) { + this.individualDeletedMessages.forEach(range -> { + newNonDurableCursor.individualDeletedMessages.addOpenClosed( + range.lowerEndpoint().getLedgerId(), + range.lowerEndpoint().getEntryId(), + range.upperEndpoint().getLedgerId(), + range.upperEndpoint().getEntryId()); + return true; + }); + } + if (batchDeletedIndexes != null) { + for (Map.Entry entry : this.batchDeletedIndexes.entrySet()) { + BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue()); + newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet); + } + } + return newNonDurableCursor; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1baa4087e5571..a01904d86f32d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -530,9 +531,15 @@ public String getTypeString() { return "Null"; } - @Override public CompletableFuture analyzeBacklog(Optional position) { - + final ManagedLedger managedLedger = topic.getManagedLedger(); + final String newNonDurableCursorName = "analyze-backlog-" + UUID.randomUUID(); + ManagedCursor newNonDurableCursor; + try { + newNonDurableCursor = ((ManagedCursorImpl) cursor).duplicateNonDurableCursor(newNonDurableCursorName); + } catch (ManagedLedgerException e) { + return CompletableFuture.failedFuture(e); + } long start = System.currentTimeMillis(); if (log.isDebugEnabled()) { log.debug("[{}][{}] Starting to analyze backlog", topicName, subName); @@ -547,7 +554,7 @@ public CompletableFuture analyzeBacklog(Optional AtomicLong rejectedMessages = new AtomicLong(); AtomicLong rescheduledMessages = new AtomicLong(); - Position currentPosition = cursor.getMarkDeletedPosition(); + Position currentPosition = newNonDurableCursor.getMarkDeletedPosition(); if (log.isDebugEnabled()) { log.debug("[{}][{}] currentPosition {}", @@ -607,7 +614,7 @@ public CompletableFuture analyzeBacklog(Optional return true; }; - return cursor.scan( + CompletableFuture res = newNonDurableCursor.scan( position, condition, batchSize, @@ -634,7 +641,22 @@ public CompletableFuture analyzeBacklog(Optional topicName, subName, end - start, result); return result; }); + res.whenComplete((__, ex) -> { + managedLedger.asyncDeleteCursor(newNonDurableCursorName, + new AsyncCallbacks.DeleteCursorCallback(){ + @Override + public void deleteCursorComplete(Object ctx) { + // Nothing to do. + } + @Override + public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}][{}] Delete non-durable cursor[{}] failed when analyze backlog.", + topicName, subName, newNonDurableCursor.getName()); + } + }, null); + }); + return res; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index f0bc80fa36495..bbcae37c4e21b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3389,4 +3389,33 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except // cleanup. admin.namespaces().deleteNamespace(ns); } + + @Test + private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topic); + // Send 10 messages. + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription) + .receiverQueueSize(0).subscribe(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + for (int i = 0; i < 10; i++) { + producer.send(i + ""); + } + + // Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog". + admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest)); + for (int i = 0; i < 10; i++) { + Awaitility.await().untilAsserted(() -> { + Message m = consumer.receive(); + assertNotNull(m); + consumer.acknowledge(m); + }); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topic); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index 64b2a58ab86e8..f8aa3dc355d92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -154,17 +154,17 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult = admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty()); - assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries()); - assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries()); - assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries()); - assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries()); - assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries()); + assertEquals(analyzeSubscriptionBacklogResult.getEntries(), numEntries); + assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(), numEntries); + assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0); + assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0); + assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0); - assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages()); - assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages()); - assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages()); + assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages); + assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages); + assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0); - assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages()); + assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(), 0); assertFalse(analyzeSubscriptionBacklogResult.isAborted()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java index 12ce7eb74c72b..b801d5f2b05a1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/BitSetRecyclable.java @@ -216,6 +216,14 @@ public static BitSetRecyclable valueOf(byte[] bytes) { return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes)); } + /** + * Copy a BitSetRecyclable. + */ + public static BitSetRecyclable valueOf(BitSetRecyclable src) { + // The internal implementation will do the array-copy. + return valueOf(src.words); + } + /** * Returns a new bit set containing all the bits in the given byte * buffer between its position and limit.