Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix] [broker] Subscription stuck due to called Admin API analyzeSubs…
Browse files Browse the repository at this point in the history
…criptionBacklog (apache#22019)
  • Loading branch information
poorbarcode authored Feb 18, 2024
1 parent 220a3d6 commit 825e997
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
position.ackSet = null;
return position;
};
private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
protected final RangeSetWrapper<PositionImpl> individualDeletedMessages;

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
protected final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -3622,4 +3622,29 @@ public boolean isCacheReadEntry() {
public ManagedLedgerConfig getConfig() {
return config;
}

/***
* Create a non-durable cursor and copy the ack stats.
*/
public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
}
if (batchDeletedIndexes != null) {
for (Map.Entry<PositionImpl, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
}
}
return newNonDurableCursor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -530,9 +531,15 @@ public String getTypeString() {
return "Null";
}

@Override
public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {

final ManagedLedger managedLedger = topic.getManagedLedger();
final String newNonDurableCursorName = "analyze-backlog-" + UUID.randomUUID();
ManagedCursor newNonDurableCursor;
try {
newNonDurableCursor = ((ManagedCursorImpl) cursor).duplicateNonDurableCursor(newNonDurableCursorName);
} catch (ManagedLedgerException e) {
return CompletableFuture.failedFuture(e);
}
long start = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
Expand All @@ -547,7 +554,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();

Position currentPosition = cursor.getMarkDeletedPosition();
Position currentPosition = newNonDurableCursor.getMarkDeletedPosition();

if (log.isDebugEnabled()) {
log.debug("[{}][{}] currentPosition {}",
Expand Down Expand Up @@ -607,7 +614,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>

return true;
};
return cursor.scan(
CompletableFuture<AnalyzeBacklogResult> res = newNonDurableCursor.scan(
position,
condition,
batchSize,
Expand All @@ -634,7 +641,22 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
topicName, subName, end - start, result);
return result;
});
res.whenComplete((__, ex) -> {
managedLedger.asyncDeleteCursor(newNonDurableCursorName,
new AsyncCallbacks.DeleteCursorCallback(){
@Override
public void deleteCursorComplete(Object ctx) {
// Nothing to do.
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Delete non-durable cursor[{}] failed when analyze backlog.",
topicName, subName, newNonDurableCursor.getName());
}
}, null);
});
return res;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,4 +3389,33 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except
// cleanup.
admin.namespaces().deleteNamespace(ns);
}

@Test
private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
// Send 10 messages.
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
.receiverQueueSize(0).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send(i + "");
}

// Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
}

// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
= admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());

assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries());
assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(analyzeSubscriptionBacklogResult.getEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);

assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages());
assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);

assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(), 0);
assertFalse(analyzeSubscriptionBacklogResult.isAborted());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public static BitSetRecyclable valueOf(byte[] bytes) {
return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
}

/**
* Copy a BitSetRecyclable.
*/
public static BitSetRecyclable valueOf(BitSetRecyclable src) {
// The internal implementation will do the array-copy.
return valueOf(src.words);
}

/**
* Returns a new bit set containing all the bits in the given byte
* buffer between its position and limit.
Expand Down

0 comments on commit 825e997

Please sign in to comment.