diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b873f..7308b404352e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -211,7 +211,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) { final int readerIndex = metadataAndPayload.readerIndex(); - processReplicatedSubscriptionSnapshot(pos, metadataAndPayload); + processReplicatedSubscriptionSnapshot(pos, metadataAndPayload, msgMetadata.getPublishTime()); metadataAndPayload.readerIndex(readerIndex); } @@ -358,13 +358,13 @@ protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int con && maxConsumersPerSubscription <= consumerSize; } - private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload) { + private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload, long publishTime) { // Remove the protobuf headers Commands.skipMessageMetadata(headersAndPayload); try { ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload); - subscription.processReplicatedSubscriptionSnapshot(snapshot); + subscription.processReplicatedSubscriptionSnapshot(snapshot, publishTime); } catch (Throwable t) { log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..6573c5f98f18f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -106,7 +106,7 @@ default long getNumberOfEntriesDelayed() { boolean isSubscriptionMigrated(); - default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { // Default is no-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 97b4dc06d0837..f73e1b95cb243 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -210,7 +210,8 @@ public boolean setReplicated(boolean replicated) { this.replicatedSubscriptionSnapshotCache = null; } else if (this.replicatedSubscriptionSnapshotCache == null) { this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, - config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(), + config.getReplicatedSubscriptionsSnapshotFrequencyMillis()); } if (this.cursor != null) { @@ -1523,10 +1524,10 @@ protected Map mergeCursorProperties(Map userProperti } @Override - public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache; if (snapshotCache != null) { - snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot)); + snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot), publishTime); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index f78aabfd821c3..fa49fc320c7d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.Iterator; import java.util.NavigableMap; import java.util.TreeMap; import lombok.extern.slf4j.Slf4j; @@ -32,16 +33,45 @@ @Slf4j public class ReplicatedSubscriptionSnapshotCache { private final String subscription; - private final NavigableMap snapshots; + private final NavigableMap snapshots; private final int maxSnapshotToCache; + private final int snapshotFrequencyMillis; - public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) { + public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache, + int snapshotFrequencyMillis) { this.subscription = subscription; this.snapshots = new TreeMap<>(); this.maxSnapshotToCache = maxSnapshotToCache; + this.snapshotFrequencyMillis = snapshotFrequencyMillis; } - public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + /** + * Adds a new replicated subscription snapshot to the cache with time-based eviction policies. + * This method handles 5 distinct cases for cache management: + *
    + *
  1. Empty Cache: Directly inserts the snapshot if cache is empty.
  2. + *
  3. Position Precedes First Entry: Clears entire cache and inserts new snapshot.
  4. + *
  5. Position in Middle: Trims later entries and inserts new snapshot.
  6. + *
  7. Cache Not Full: Inserts snapshot if cache has available space.
  8. + *
  9. Cache Full: Evicts median-aged entry before insertion (time-window optimized).
  10. + *
+ * + *

Time-based eviction considers two conditions: + *

    + *
  • Minimum frequency interval ({@code snapshotFrequencyMillis})
  • + *
  • Dynamic time window per slot ({@code timeWindowPerSlot})
  • + *
+ * + * @param snapshot The replicated subscription snapshot to add (non-null) + * @param publishTime The ReplicatedSubscriptionsSnapshot marker message (entry), + * the publishTime originates from the same broker, the time is reliable and clock skew is + * not a problem. + * @throws NullPointerException If snapshot argument is null + * @see ReplicatedSubscriptionsSnapshot + * @see #findPositionByIndex(int) For median position calculation in eviction case + */ + public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { + SnapshotEntry snapshotEntry = new SnapshotEntry(snapshot, publishTime); MarkersMessageIdData msgId = snapshot.getLocalMessageId(); Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); @@ -49,15 +79,70 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position, snapshot.getSnapshotId()); } + // Case 1: cache if empty + if (snapshots.lastEntry() == null) { + snapshots.put(position, snapshotEntry); + return; + } - snapshots.put(position, snapshot); + // The time difference between the previous position and the earliest cache entry + final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().timestamp(); + // The time difference between the previous position and the lately cache entry + final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().timestamp(); + // The time window length of each time slot, used for dynamic adjustment in the snapshot cache. + // The larger the time slot, the slower the update. + final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; - // Prune the cache - while (snapshots.size() > maxSnapshotToCache) { - snapshots.pollFirstEntry(); + if (position.compareTo(snapshots.firstKey()) < 0) { + // Case 2: When executing 'pulsar-admin topics reset-cursor', reset position for subscription to a position + // position precedes first entry + snapshots.clear(); + snapshots.put(position, snapshotEntry); + return; + } else if (position.compareTo(snapshots.lastKey()) < 0) { + // Case 3: When executing 'pulsar-admin topics reset-cursor', reset position for subscription to a position + // the position is in the middle, delete the cache after that position + while (position.compareTo(snapshots.lastKey()) < 0) { + snapshots.pollLastEntry(); + } + snapshots.put(position, snapshotEntry); + } + // omit cache + // timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency + // timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments + if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { + return; + } + if (snapshots.size() < maxSnapshotToCache) { + // Case 4: Add to cache if not full + snapshots.put(position, snapshotEntry); + } else { + // Case 5: Median-based eviction when cache is full + int medianIndex = maxSnapshotToCache / 2; + Position positionToRemove = findPositionByIndex(medianIndex); + if (positionToRemove != null) { + snapshots.remove(positionToRemove); + } + snapshots.put(position, snapshotEntry); } } + /** + * Find the Position in NavigableMap according to the target index. + */ + private Position findPositionByIndex(int targetIndex) { + Iterator it = snapshots.keySet().iterator(); + int currentIndex = 0; + while (it.hasNext()) { + Position position = it.next(); + if (currentIndex == targetIndex) { + return position; + } + currentIndex++; + } + return null; + } + /** * Signal that the mark-delete position on the subscription has been advanced. If there is a snapshot that * correspond to this position, it will returned, other it will return null. @@ -72,7 +157,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P } else { // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we // can use - snapshot = snapshots.pollFirstEntry().getValue(); + snapshot = snapshots.pollFirstEntry().getValue().snapshot(); } } @@ -88,4 +173,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P } return snapshot; } + + private record SnapshotEntry(ReplicatedSubscriptionsSnapshot snapshot, long timestamp) { + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java index 1587c4965c388..d4a2e72181228 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java @@ -28,10 +28,13 @@ @Test(groups = "broker") public class ReplicatedSubscriptionSnapshotCacheTest { + int snapshotFrequencyMillis = 1000; @Test public void testSnapshotCache() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10); + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0))); @@ -51,11 +54,11 @@ public void testSnapshotCache() { ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-7"); s7.setLocalMessageId().setLedgerId(7 ).setEntryId(7); - - cache.addNewSnapshot(s1); - cache.addNewSnapshot(s2); - cache.addNewSnapshot(s5); - cache.addNewSnapshot(s7); + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime); + cache.addNewSnapshot(s2, publishTime + snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 3L * snapshotFrequencyMillis); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 0))); @@ -72,9 +75,73 @@ public void testSnapshotCache() { assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(5, 5))); } + @Test + public void testSnapshotCacheByRestCursor() { + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis); + // The rest cursor is smaller than the cache first position + ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-1"); + s1.setLocalMessageId().setLedgerId(10).setEntryId(10); + + ReplicatedSubscriptionsSnapshot s2 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-2"); + s2.setLocalMessageId().setLedgerId(20).setEntryId(20); + + ReplicatedSubscriptionsSnapshot s3 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-3"); + s3.setLocalMessageId().setLedgerId(30).setEntryId(30); + + ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-4"); + s4.setLocalMessageId().setLedgerId(1).setEntryId(1); + + ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-5"); + s5.setLocalMessageId().setLedgerId(10).setEntryId(10); + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime); + cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis); + cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis); + + ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); + assertEquals(snapshot.getSnapshotId(), "snapshot-4"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10)); + assertEquals(snapshot.getSnapshotId(), "snapshot-5"); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(20, 20))); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30))); + + // The rest cursor is smaller than the cache last position + ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-6"); + s6.setLocalMessageId().setLedgerId(10).setEntryId(10); + ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-7"); + s7.setLocalMessageId().setLedgerId(20).setEntryId(20); + ReplicatedSubscriptionsSnapshot s8 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-8"); + s8.setLocalMessageId().setLedgerId(30).setEntryId(30); + ReplicatedSubscriptionsSnapshot s9 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-9"); + s9.setLocalMessageId().setLedgerId(20).setEntryId(20); + cache.addNewSnapshot(s6, publishTime + 6L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis); + cache.addNewSnapshot(s8, publishTime + 8L * snapshotFrequencyMillis); + cache.addNewSnapshot(s9, publishTime + 9L * snapshotFrequencyMillis); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10)); + assertEquals(snapshot.getSnapshotId(), "snapshot-6"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(20, 20)); + assertEquals(snapshot.getSnapshotId(), "snapshot-9"); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30))); + } + @Test public void testSnapshotCachePruning() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3); + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 4, snapshotFrequencyMillis); ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-1"); @@ -91,20 +158,37 @@ public void testSnapshotCachePruning() { ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-4"); s4.setLocalMessageId().setLedgerId(4).setEntryId(4); - - cache.addNewSnapshot(s1); - cache.addNewSnapshot(s2); - cache.addNewSnapshot(s3); - cache.addNewSnapshot(s4); - - // Snapshot-1 was already pruned - assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 1))); + ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-5"); + s5.setLocalMessageId().setLedgerId(5).setEntryId(5); + ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-6"); + s6.setLocalMessageId().setLedgerId(6).setEntryId(6); + ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-7"); + s7.setLocalMessageId().setLedgerId(7).setEntryId(7); + + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime + snapshotFrequencyMillis); + cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis); + cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis); + cache.addNewSnapshot(s6, publishTime + 5L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis); + // snapshots = [s1, s2, s5, s7] + cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(2, 2)); - assertNotNull(snapshot); assertEquals(snapshot.getSnapshotId(), "snapshot-2"); - + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(3, 3)); + assertNull(snapshot); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(4, 4)); + assertNull(snapshot); snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 5)); - assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-4"); + assertEquals(snapshot.getSnapshotId(), "snapshot-5"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(6, 6)); + assertNull(snapshot); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(7, 7)); + assertEquals(snapshot.getSnapshotId(), "snapshot-7"); } }