From ad9830764aef8c605551ece8af37b34b74c66285 Mon Sep 17 00:00:00 2001 From: liudezhi Date: Wed, 14 May 2025 14:23:28 +0800 Subject: [PATCH 1/5] optimize subscription snapshot cache algorithm --- .../service/AbstractBaseDispatcher.java | 6 +- .../pulsar/broker/service/Subscription.java | 2 +- .../persistent/PersistentSubscription.java | 7 +- .../ReplicatedSubscriptionSnapshotCache.java | 68 +++++++++- ...plicatedSubscriptionSnapshotCacheTest.java | 122 +++++++++++++++--- 5 files changed, 173 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b873f..7308b404352e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -211,7 +211,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) { final int readerIndex = metadataAndPayload.readerIndex(); - processReplicatedSubscriptionSnapshot(pos, metadataAndPayload); + processReplicatedSubscriptionSnapshot(pos, metadataAndPayload, msgMetadata.getPublishTime()); metadataAndPayload.readerIndex(readerIndex); } @@ -358,13 +358,13 @@ protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int con && maxConsumersPerSubscription <= consumerSize; } - private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload) { + private void processReplicatedSubscriptionSnapshot(Position pos, ByteBuf headersAndPayload, long publishTime) { // Remove the protobuf headers Commands.skipMessageMetadata(headersAndPayload); try { ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload); - subscription.processReplicatedSubscriptionSnapshot(snapshot); + subscription.processReplicatedSubscriptionSnapshot(snapshot, publishTime); } catch (Throwable t) { log.warn("Failed to process replicated subscription snapshot at {} -- {}", pos, t.getMessage(), t); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..6573c5f98f18f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -106,7 +106,7 @@ default long getNumberOfEntriesDelayed() { boolean isSubscriptionMigrated(); - default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { // Default is no-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 97b4dc06d0837..f73e1b95cb243 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -210,7 +210,8 @@ public boolean setReplicated(boolean replicated) { this.replicatedSubscriptionSnapshotCache = null; } else if (this.replicatedSubscriptionSnapshotCache == null) { this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName, - config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription()); + config.getReplicatedSubscriptionsSnapshotMaxCachedPerSubscription(), + config.getReplicatedSubscriptionsSnapshotFrequencyMillis()); } if (this.cursor != null) { @@ -1523,10 +1524,10 @@ protected Map mergeCursorProperties(Map userProperti } @Override - public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache; if (snapshotCache != null) { - snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot)); + snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot), publishTime); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index f78aabfd821c3..cc4d45510ce67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.Iterator; +import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import lombok.extern.slf4j.Slf4j; @@ -33,15 +35,20 @@ public class ReplicatedSubscriptionSnapshotCache { private final String subscription; private final NavigableMap snapshots; + private final NavigableMap positionToTimestamp; private final int maxSnapshotToCache; + private final int snapshotFrequencyMillis; - public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache) { + public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotToCache, + int snapshotFrequencyMillis) { this.subscription = subscription; this.snapshots = new TreeMap<>(); + this.positionToTimestamp = new TreeMap<>(); this.maxSnapshotToCache = maxSnapshotToCache; + this.snapshotFrequencyMillis = snapshotFrequencyMillis; } - public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { + public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { MarkersMessageIdData msgId = snapshot.getLocalMessageId(); Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); @@ -50,12 +57,60 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot snapshot.getSnapshotId()); } - snapshots.put(position, snapshot); + if (positionToTimestamp.lastEntry() == null) { + snapshots.put(position, snapshot); + positionToTimestamp.put(position, publishTime); + return; + } + + final long timeSinceFirstSnapshot = publishTime - positionToTimestamp.firstEntry().getValue(); + final long timeSinceLastSnapshot = publishTime - positionToTimestamp.lastEntry().getValue(); + final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; + // reset cursor + if (position.compareTo(positionToTimestamp.firstKey()) < 0) { + positionToTimestamp.clear(); + snapshots.clear(); + snapshots.put(position, snapshot); + positionToTimestamp.put(position, publishTime); + return; + } else if (position.compareTo(positionToTimestamp.lastKey()) < 0) { + while (position.compareTo(positionToTimestamp.lastKey()) < 0) { + positionToTimestamp.pollLastEntry(); + snapshots.pollLastEntry(); + } + snapshots.put(position, snapshot); + positionToTimestamp.put(position, publishTime); + } + + if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { + return; + } + if (snapshots.size() < maxSnapshotToCache) { + snapshots.put(position, snapshot); + positionToTimestamp.put(position, publishTime); + } else { + int medianIndex = maxSnapshotToCache / 2; + Position positionToRemove = findPositionByIndex(medianIndex); + if (positionToRemove != null) { + positionToTimestamp.remove(positionToRemove); + snapshots.remove(positionToRemove); + } + positionToTimestamp.put(position, publishTime); + snapshots.put(position, snapshot); + } + } - // Prune the cache - while (snapshots.size() > maxSnapshotToCache) { - snapshots.pollFirstEntry(); + private Position findPositionByIndex(int targetIndex) { + Iterator> it = positionToTimestamp.entrySet().iterator(); + int currentIndex = 0; + while (it.hasNext()) { + Map.Entry entry = it.next(); + if (currentIndex == targetIndex) { + return entry.getKey(); + } + currentIndex++; } + return null; } /** @@ -73,6 +128,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we // can use snapshot = snapshots.pollFirstEntry().getValue(); + positionToTimestamp.pollFirstEntry(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java index 1587c4965c388..d4a2e72181228 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCacheTest.java @@ -28,10 +28,13 @@ @Test(groups = "broker") public class ReplicatedSubscriptionSnapshotCacheTest { + int snapshotFrequencyMillis = 1000; @Test public void testSnapshotCache() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 10); + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(100, 0))); @@ -51,11 +54,11 @@ public void testSnapshotCache() { ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-7"); s7.setLocalMessageId().setLedgerId(7 ).setEntryId(7); - - cache.addNewSnapshot(s1); - cache.addNewSnapshot(s2); - cache.addNewSnapshot(s5); - cache.addNewSnapshot(s7); + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime); + cache.addNewSnapshot(s2, publishTime + snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 3L * snapshotFrequencyMillis); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(0, 0))); assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 0))); @@ -72,9 +75,73 @@ public void testSnapshotCache() { assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(5, 5))); } + @Test + public void testSnapshotCacheByRestCursor() { + + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 10, snapshotFrequencyMillis); + // The rest cursor is smaller than the cache first position + ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-1"); + s1.setLocalMessageId().setLedgerId(10).setEntryId(10); + + ReplicatedSubscriptionsSnapshot s2 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-2"); + s2.setLocalMessageId().setLedgerId(20).setEntryId(20); + + ReplicatedSubscriptionsSnapshot s3 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-3"); + s3.setLocalMessageId().setLedgerId(30).setEntryId(30); + + ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-4"); + s4.setLocalMessageId().setLedgerId(1).setEntryId(1); + + ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-5"); + s5.setLocalMessageId().setLedgerId(10).setEntryId(10); + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime); + cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis); + cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis); + + ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); + assertEquals(snapshot.getSnapshotId(), "snapshot-4"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10)); + assertEquals(snapshot.getSnapshotId(), "snapshot-5"); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(20, 20))); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30))); + + // The rest cursor is smaller than the cache last position + ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-6"); + s6.setLocalMessageId().setLedgerId(10).setEntryId(10); + ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-7"); + s7.setLocalMessageId().setLedgerId(20).setEntryId(20); + ReplicatedSubscriptionsSnapshot s8 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-8"); + s8.setLocalMessageId().setLedgerId(30).setEntryId(30); + ReplicatedSubscriptionsSnapshot s9 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-9"); + s9.setLocalMessageId().setLedgerId(20).setEntryId(20); + cache.addNewSnapshot(s6, publishTime + 6L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis); + cache.addNewSnapshot(s8, publishTime + 8L * snapshotFrequencyMillis); + cache.addNewSnapshot(s9, publishTime + 9L * snapshotFrequencyMillis); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(10, 10)); + assertEquals(snapshot.getSnapshotId(), "snapshot-6"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(20, 20)); + assertEquals(snapshot.getSnapshotId(), "snapshot-9"); + assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(30, 30))); + } + @Test public void testSnapshotCachePruning() { - ReplicatedSubscriptionSnapshotCache cache = new ReplicatedSubscriptionSnapshotCache("my-subscription", 3); + ReplicatedSubscriptionSnapshotCache cache = + new ReplicatedSubscriptionSnapshotCache("my-subscription", 4, snapshotFrequencyMillis); ReplicatedSubscriptionsSnapshot s1 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-1"); @@ -91,20 +158,37 @@ public void testSnapshotCachePruning() { ReplicatedSubscriptionsSnapshot s4 = new ReplicatedSubscriptionsSnapshot() .setSnapshotId("snapshot-4"); s4.setLocalMessageId().setLedgerId(4).setEntryId(4); - - cache.addNewSnapshot(s1); - cache.addNewSnapshot(s2); - cache.addNewSnapshot(s3); - cache.addNewSnapshot(s4); - - // Snapshot-1 was already pruned - assertNull(cache.advancedMarkDeletePosition(PositionFactory.create(1, 1))); + ReplicatedSubscriptionsSnapshot s5 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-5"); + s5.setLocalMessageId().setLedgerId(5).setEntryId(5); + ReplicatedSubscriptionsSnapshot s6 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-6"); + s6.setLocalMessageId().setLedgerId(6).setEntryId(6); + ReplicatedSubscriptionsSnapshot s7 = new ReplicatedSubscriptionsSnapshot() + .setSnapshotId("snapshot-7"); + s7.setLocalMessageId().setLedgerId(7).setEntryId(7); + + long publishTime = System.currentTimeMillis(); + cache.addNewSnapshot(s1, publishTime + snapshotFrequencyMillis); + cache.addNewSnapshot(s2, publishTime + 2L * snapshotFrequencyMillis); + cache.addNewSnapshot(s3, publishTime + 3L * snapshotFrequencyMillis); + cache.addNewSnapshot(s4, publishTime + 4L * snapshotFrequencyMillis); + cache.addNewSnapshot(s5, publishTime + 5L * snapshotFrequencyMillis); + cache.addNewSnapshot(s6, publishTime + 5L * snapshotFrequencyMillis); + cache.addNewSnapshot(s7, publishTime + 7L * snapshotFrequencyMillis); + // snapshots = [s1, s2, s5, s7] + cache.advancedMarkDeletePosition(PositionFactory.create(1, 1)); ReplicatedSubscriptionsSnapshot snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(2, 2)); - assertNotNull(snapshot); assertEquals(snapshot.getSnapshotId(), "snapshot-2"); - + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(3, 3)); + assertNull(snapshot); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(4, 4)); + assertNull(snapshot); snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(5, 5)); - assertNotNull(snapshot); - assertEquals(snapshot.getSnapshotId(), "snapshot-4"); + assertEquals(snapshot.getSnapshotId(), "snapshot-5"); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(6, 6)); + assertNull(snapshot); + snapshot = cache.advancedMarkDeletePosition(PositionFactory.create(7, 7)); + assertEquals(snapshot.getSnapshotId(), "snapshot-7"); } } From 586f879cb65ea8636df5f123b7d0e8fb551126d9 Mon Sep 17 00:00:00 2001 From: liudezhi Date: Wed, 14 May 2025 21:42:44 +0800 Subject: [PATCH 2/5] add code comment --- .../ReplicatedSubscriptionSnapshotCache.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index cc4d45510ce67..f876af215e364 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -35,6 +35,7 @@ public class ReplicatedSubscriptionSnapshotCache { private final String subscription; private final NavigableMap snapshots; + // Used to record the timestamp of snapshots location, which will be used to adjust cache update frequency later. private final NavigableMap positionToTimestamp; private final int maxSnapshotToCache; private final int snapshotFrequencyMillis; @@ -56,24 +57,30 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot log.debug("[{}] Added new replicated-subscription snapshot at {} -- {}", subscription, position, snapshot.getSnapshotId()); } - + // Case 1: cache if empty if (positionToTimestamp.lastEntry() == null) { snapshots.put(position, snapshot); positionToTimestamp.put(position, publishTime); return; } + // The time difference between the previous position and the earliest cache entry final long timeSinceFirstSnapshot = publishTime - positionToTimestamp.firstEntry().getValue(); + // The time difference between the previous position and the lately cache entry final long timeSinceLastSnapshot = publishTime - positionToTimestamp.lastEntry().getValue(); + // The time window length of each time slot, used for dynamic adjustment in the snapshot cache. + // The larger the time slot, the slower the update. final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; - // reset cursor + if (position.compareTo(positionToTimestamp.firstKey()) < 0) { + // Case 2: Reset cursor if position precedes first entry positionToTimestamp.clear(); snapshots.clear(); snapshots.put(position, snapshot); positionToTimestamp.put(position, publishTime); return; } else if (position.compareTo(positionToTimestamp.lastKey()) < 0) { + // Case 3: Reset cursor If the position is in the middle, delete the cache after that position while (position.compareTo(positionToTimestamp.lastKey()) < 0) { positionToTimestamp.pollLastEntry(); snapshots.pollLastEntry(); @@ -81,14 +88,18 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot snapshots.put(position, snapshot); positionToTimestamp.put(position, publishTime); } - + // Time-based eviction conditions + // timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency + // timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { return; } if (snapshots.size() < maxSnapshotToCache) { + // Case 4: Add to cache if not full snapshots.put(position, snapshot); positionToTimestamp.put(position, publishTime); } else { + // Case 5: Median-based eviction when cache is full int medianIndex = maxSnapshotToCache / 2; Position positionToRemove = findPositionByIndex(medianIndex); if (positionToRemove != null) { @@ -100,6 +111,9 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot } } + /** + * Find the Position in NavigableMap according to the target index. + */ private Position findPositionByIndex(int targetIndex) { Iterator> it = positionToTimestamp.entrySet().iterator(); int currentIndex = 0; From 2aaaffdece7e227d679485f7b3f6df44dd0a95f6 Mon Sep 17 00:00:00 2001 From: liudezhi Date: Thu, 15 May 2025 11:35:37 +0800 Subject: [PATCH 3/5] Simplify the code --- .../ReplicatedSubscriptionSnapshotCache.java | 54 ++++++++++++------- .../src/main/proto/PulsarMarkers.proto | 1 + 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index f876af215e364..86796a78dccdd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -35,8 +35,6 @@ public class ReplicatedSubscriptionSnapshotCache { private final String subscription; private final NavigableMap snapshots; - // Used to record the timestamp of snapshots location, which will be used to adjust cache update frequency later. - private final NavigableMap positionToTimestamp; private final int maxSnapshotToCache; private final int snapshotFrequencyMillis; @@ -44,12 +42,37 @@ public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotT int snapshotFrequencyMillis) { this.subscription = subscription; this.snapshots = new TreeMap<>(); - this.positionToTimestamp = new TreeMap<>(); this.maxSnapshotToCache = maxSnapshotToCache; this.snapshotFrequencyMillis = snapshotFrequencyMillis; } + /** + * Adds a new replicated subscription snapshot to the cache with time-based eviction policies. + * This method handles 5 distinct cases for cache management: + *
    + *
  1. Empty Cache: Directly inserts the snapshot if cache is empty.
  2. + *
  3. Position Precedes First Entry: Clears entire cache and inserts new snapshot.
  4. + *
  5. Position in Middle: Trims later entries and inserts new snapshot.
  6. + *
  7. Cache Not Full: Inserts snapshot if cache has available space.
  8. + *
  9. Cache Full: Evicts median-aged entry before insertion (time-window optimized).
  10. + *
+ * + *

Time-based eviction considers two conditions: + *

    + *
  • Minimum frequency interval ({@code snapshotFrequencyMillis})
  • + *
  • Dynamic time window per slot ({@code timeWindowPerSlot})
  • + *
+ * + * @param snapshot The replicated subscription snapshot to add (non-null) + * @param publishTime The ReplicatedSubscriptionsSnapshot marker message (entry), + * the publishTime originates from the same broker, the time is reliable and clock skew is + * not a problem. + * @throws NullPointerException If snapshot argument is null + * @see ReplicatedSubscriptionsSnapshot + * @see #findPositionByIndex(int) For median position calculation in eviction case + */ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { + snapshot.setMarkersTimestamp(publishTime); MarkersMessageIdData msgId = snapshot.getLocalMessageId(); Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); @@ -58,35 +81,30 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot snapshot.getSnapshotId()); } // Case 1: cache if empty - if (positionToTimestamp.lastEntry() == null) { + if (snapshots.lastEntry() == null) { snapshots.put(position, snapshot); - positionToTimestamp.put(position, publishTime); return; } // The time difference between the previous position and the earliest cache entry - final long timeSinceFirstSnapshot = publishTime - positionToTimestamp.firstEntry().getValue(); + final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().getMarkersTimestamp(); // The time difference between the previous position and the lately cache entry - final long timeSinceLastSnapshot = publishTime - positionToTimestamp.lastEntry().getValue(); + final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().getMarkersTimestamp(); // The time window length of each time slot, used for dynamic adjustment in the snapshot cache. // The larger the time slot, the slower the update. final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; - if (position.compareTo(positionToTimestamp.firstKey()) < 0) { + if (position.compareTo(snapshots.firstKey()) < 0) { // Case 2: Reset cursor if position precedes first entry - positionToTimestamp.clear(); snapshots.clear(); snapshots.put(position, snapshot); - positionToTimestamp.put(position, publishTime); return; - } else if (position.compareTo(positionToTimestamp.lastKey()) < 0) { + } else if (position.compareTo(snapshots.lastKey()) < 0) { // Case 3: Reset cursor If the position is in the middle, delete the cache after that position - while (position.compareTo(positionToTimestamp.lastKey()) < 0) { - positionToTimestamp.pollLastEntry(); + while (position.compareTo(snapshots.lastKey()) < 0) { snapshots.pollLastEntry(); } snapshots.put(position, snapshot); - positionToTimestamp.put(position, publishTime); } // Time-based eviction conditions // timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency @@ -97,16 +115,13 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot if (snapshots.size() < maxSnapshotToCache) { // Case 4: Add to cache if not full snapshots.put(position, snapshot); - positionToTimestamp.put(position, publishTime); } else { // Case 5: Median-based eviction when cache is full int medianIndex = maxSnapshotToCache / 2; Position positionToRemove = findPositionByIndex(medianIndex); if (positionToRemove != null) { - positionToTimestamp.remove(positionToRemove); snapshots.remove(positionToRemove); } - positionToTimestamp.put(position, publishTime); snapshots.put(position, snapshot); } } @@ -115,10 +130,10 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot * Find the Position in NavigableMap according to the target index. */ private Position findPositionByIndex(int targetIndex) { - Iterator> it = positionToTimestamp.entrySet().iterator(); + Iterator> it = snapshots.entrySet().iterator(); int currentIndex = 0; while (it.hasNext()) { - Map.Entry entry = it.next(); + Map.Entry entry = it.next(); if (currentIndex == targetIndex) { return entry.getKey(); } @@ -142,7 +157,6 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we // can use snapshot = snapshots.pollFirstEntry().getValue(); - positionToTimestamp.pollFirstEntry(); } } diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto index 40f16f3272844..aeae285f4bc88 100644 --- a/pulsar-common/src/main/proto/PulsarMarkers.proto +++ b/pulsar-common/src/main/proto/PulsarMarkers.proto @@ -60,6 +60,7 @@ message ReplicatedSubscriptionsSnapshot { required string snapshot_id = 1; optional MarkersMessageIdData local_message_id = 2; repeated ClusterMessageId clusters = 3; + optional int64 markers_timestamp = 4; } // When the replicated subscription mark-delete position From c412822b340b84a2af0cec2a3a5d502e9cc9f1a8 Mon Sep 17 00:00:00 2001 From: liudezhi Date: Thu, 15 May 2025 21:43:08 +0800 Subject: [PATCH 4/5] Simplify the code --- .../ReplicatedSubscriptionSnapshotCache.java | 27 ++++++++++--------- .../src/main/proto/PulsarMarkers.proto | 1 - 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index 86796a78dccdd..67e211de46e56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -34,7 +34,7 @@ @Slf4j public class ReplicatedSubscriptionSnapshotCache { private final String subscription; - private final NavigableMap snapshots; + private final NavigableMap snapshots; private final int maxSnapshotToCache; private final int snapshotFrequencyMillis; @@ -72,7 +72,7 @@ public ReplicatedSubscriptionSnapshotCache(String subscription, int maxSnapshotT * @see #findPositionByIndex(int) For median position calculation in eviction case */ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot, long publishTime) { - snapshot.setMarkersTimestamp(publishTime); + SnapshotEntry snapshotEntry = new SnapshotEntry(snapshot, publishTime); MarkersMessageIdData msgId = snapshot.getLocalMessageId(); Position position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); @@ -82,14 +82,14 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot } // Case 1: cache if empty if (snapshots.lastEntry() == null) { - snapshots.put(position, snapshot); + snapshots.put(position, snapshotEntry); return; } // The time difference between the previous position and the earliest cache entry - final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().getMarkersTimestamp(); + final long timeSinceFirstSnapshot = publishTime - snapshots.firstEntry().getValue().timestamp(); // The time difference between the previous position and the lately cache entry - final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().getMarkersTimestamp(); + final long timeSinceLastSnapshot = publishTime - snapshots.lastEntry().getValue().timestamp(); // The time window length of each time slot, used for dynamic adjustment in the snapshot cache. // The larger the time slot, the slower the update. final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; @@ -97,14 +97,14 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot if (position.compareTo(snapshots.firstKey()) < 0) { // Case 2: Reset cursor if position precedes first entry snapshots.clear(); - snapshots.put(position, snapshot); + snapshots.put(position, snapshotEntry); return; } else if (position.compareTo(snapshots.lastKey()) < 0) { // Case 3: Reset cursor If the position is in the middle, delete the cache after that position while (position.compareTo(snapshots.lastKey()) < 0) { snapshots.pollLastEntry(); } - snapshots.put(position, snapshot); + snapshots.put(position, snapshotEntry); } // Time-based eviction conditions // timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency @@ -114,7 +114,7 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot } if (snapshots.size() < maxSnapshotToCache) { // Case 4: Add to cache if not full - snapshots.put(position, snapshot); + snapshots.put(position, snapshotEntry); } else { // Case 5: Median-based eviction when cache is full int medianIndex = maxSnapshotToCache / 2; @@ -122,7 +122,7 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot if (positionToRemove != null) { snapshots.remove(positionToRemove); } - snapshots.put(position, snapshot); + snapshots.put(position, snapshotEntry); } } @@ -130,10 +130,10 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot * Find the Position in NavigableMap according to the target index. */ private Position findPositionByIndex(int targetIndex) { - Iterator> it = snapshots.entrySet().iterator(); + Iterator> it = snapshots.entrySet().iterator(); int currentIndex = 0; while (it.hasNext()) { - Map.Entry entry = it.next(); + Map.Entry entry = it.next(); if (currentIndex == targetIndex) { return entry.getKey(); } @@ -156,7 +156,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P } else { // This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we // can use - snapshot = snapshots.pollFirstEntry().getValue(); + snapshot = snapshots.pollFirstEntry().getValue().snapshot(); } } @@ -172,4 +172,7 @@ public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(P } return snapshot; } + + private record SnapshotEntry(ReplicatedSubscriptionsSnapshot snapshot, long timestamp) { + } } diff --git a/pulsar-common/src/main/proto/PulsarMarkers.proto b/pulsar-common/src/main/proto/PulsarMarkers.proto index aeae285f4bc88..40f16f3272844 100644 --- a/pulsar-common/src/main/proto/PulsarMarkers.proto +++ b/pulsar-common/src/main/proto/PulsarMarkers.proto @@ -60,7 +60,6 @@ message ReplicatedSubscriptionsSnapshot { required string snapshot_id = 1; optional MarkersMessageIdData local_message_id = 2; repeated ClusterMessageId clusters = 3; - optional int64 markers_timestamp = 4; } // When the replicated subscription mark-delete position From 5af3574674a3a9b0d234da4bc4ac115ba4215189 Mon Sep 17 00:00:00 2001 From: liudezhi Date: Fri, 16 May 2025 08:53:03 +0800 Subject: [PATCH 5/5] Optimize Annotations --- .../ReplicatedSubscriptionSnapshotCache.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java index 67e211de46e56..fa49fc320c7d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.persistent; import java.util.Iterator; -import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import lombok.extern.slf4j.Slf4j; @@ -95,18 +94,20 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot final long timeWindowPerSlot = timeSinceFirstSnapshot / snapshotFrequencyMillis / maxSnapshotToCache; if (position.compareTo(snapshots.firstKey()) < 0) { - // Case 2: Reset cursor if position precedes first entry + // Case 2: When executing 'pulsar-admin topics reset-cursor', reset position for subscription to a position + // position precedes first entry snapshots.clear(); snapshots.put(position, snapshotEntry); return; } else if (position.compareTo(snapshots.lastKey()) < 0) { - // Case 3: Reset cursor If the position is in the middle, delete the cache after that position + // Case 3: When executing 'pulsar-admin topics reset-cursor', reset position for subscription to a position + // the position is in the middle, delete the cache after that position while (position.compareTo(snapshots.lastKey()) < 0) { snapshots.pollLastEntry(); } snapshots.put(position, snapshotEntry); } - // Time-based eviction conditions + // omit cache // timeSinceLastSnapshot < snapshotFrequencyMillis, keep the same frequency // timeSinceLastSnapshot < timeWindowPerSlot, implementing dynamic adjustments if (timeSinceLastSnapshot < snapshotFrequencyMillis || timeSinceLastSnapshot < timeWindowPerSlot) { @@ -130,12 +131,12 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot * Find the Position in NavigableMap according to the target index. */ private Position findPositionByIndex(int targetIndex) { - Iterator> it = snapshots.entrySet().iterator(); + Iterator it = snapshots.keySet().iterator(); int currentIndex = 0; while (it.hasNext()) { - Map.Entry entry = it.next(); + Position position = it.next(); if (currentIndex == targetIndex) { - return entry.getKey(); + return position; } currentIndex++; }