From 220a3d601602d67f5f44516c5d9895dfaa270380 Mon Sep 17 00:00:00 2001 From: atomchen <492672043@qq.com> Date: Sun, 18 Feb 2024 15:51:49 +0800 Subject: [PATCH] =?UTF-8?q?[fix][broker]Support=20setting=20`autoSkipNonRe?= =?UTF-8?q?coverableData`=20dynamically=20in=20expiryMon=E2=80=A6=20(#2199?= =?UTF-8?q?1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: atomchchen --- .../PersistentMessageExpiryMonitor.java | 10 ++++--- .../persistent/PersistentTopicTest.java | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 978cd3f886f16..5d3596d0d05eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -48,7 +49,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; - private final boolean autoSkipNonRecoverableData; private final PersistentSubscription subscription; private static final int FALSE = 0; @@ -68,8 +68,12 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription this.subscription = subscription; this.msgExpired = new Rate(); this.totalMsgExpired = new LongAdder(); + } + + @VisibleForTesting + public boolean isAutoSkipNonRecoverableData() { // check to avoid test failures - this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null + return this.cursor.getManagedLedger() != null && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); } @@ -196,7 +200,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional if (log.isDebugEnabled()) { log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception); } - if (autoSkipNonRecoverableData && failedReadPosition.isPresent() + if (isAutoSkipNonRecoverableData() && failedReadPosition.isPresent() && (exception instanceof NonRecoverableLedgerException)) { log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition, exception.getMessage()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 4eb2aa15fa292..06a46f86c034e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -635,4 +635,30 @@ public void testCheckPersistencePolicies() throws Exception { assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionSizeInMB(), 1L); assertEquals(persistentTopic.getManagedLedger().getConfig().getRetentionTimeMillis(), TimeUnit.MINUTES.toMillis(1)); } + + @Test + public void testDynamicConfigurationAutoSkipNonRecoverableData() throws Exception { + pulsar.getConfiguration().setAutoSkipNonRecoverableData(false); + final String topicName = "persistent://prop/ns-abc/testAutoSkipNonRecoverableData"; + final String subName = "test_sub"; + + Consumer subscribe = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + PersistentSubscription subscription = persistentTopic.getSubscription(subName); + + assertFalse(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData()); + assertFalse(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData()); + + String key = "autoSkipNonRecoverableData"; + admin.brokers().updateDynamicConfiguration(key, "true"); + Awaitility.await() + .untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations().get(key), "true")); + + assertTrue(persistentTopic.ledger.getConfig().isAutoSkipNonRecoverableData()); + assertTrue(subscription.getExpiryMonitor().isAutoSkipNonRecoverableData()); + + subscribe.close(); + admin.topics().delete(topicName); + } }