diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index e31db8bac46d6..a7030c61e7302 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -149,6 +149,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { */ @Getter private boolean metadataServiceAvailable; + private final Runnable cancelSessionListener; private final ManagedLedgerConfig defaultManagedLedgerConfig; private static class PendingInitializeManagedLedger { @@ -257,7 +258,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS); closed = false; - metadataStore.registerSessionListener(this::handleMetadataStoreNotification); + cancelSessionListener = + metadataStore.registerCancellableSessionListener(this::handleMetadataStoreNotification); openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this); openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this); @@ -710,6 +712,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { })); }).whenCompleteAsync((__, ___) -> { //wait for tasks in scheduledExecutor executed. + store.close(); + cancelSessionListener.run(); openTelemetryManagedCursorStats.close(); openTelemetryManagedLedgerStats.close(); openTelemetryCacheStats.close(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 21e12d81a727d..84f26cc94fb40 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -184,4 +184,11 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn * @return a future represents the result of the operation. */ CompletableFuture> getManagedLedgerPropertiesAsync(String name); + + /** + * Close the store. + */ + default void close() { + // Default implementation does nothing + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 611d9d60202cd..6f3ad6d2a2403 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -60,6 +60,7 @@ public class MetaStoreImpl implements MetaStore, Consumer { private final MetadataStore store; private final OrderedExecutor executor; + private final Runnable cancelMetadataStoreListener; private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000 private final MetadataCompressionConfig ledgerInfoCompressionConfig; @@ -74,7 +75,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.cursorInfoCompressionConfig = MetadataCompressionConfig.noCompression; managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); if (store != null) { - store.registerListener(this); + cancelMetadataStoreListener = store.registerCancellableListener(this); + } else { + cancelMetadataStoreListener = null; } } @@ -87,7 +90,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, this.cursorInfoCompressionConfig = cursorInfoCompressionConfig; managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); if (store != null) { - store.registerListener(this); + cancelMetadataStoreListener = store.registerCancellableListener(this); + } else { + cancelMetadataStoreListener = null; } } @@ -548,4 +553,9 @@ private CompressionCodec getCompressionCodec(CompressionType compressionType) { org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name())); } + public void close() { + if (cancelMetadataStoreListener != null) { + cancelMetadataStoreListener.run(); + } + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 2df4dc22c144f..0001d972c1e40 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import org.apache.bookkeeper.client.DefaultBookieAddressResolver; import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy; import org.apache.bookkeeper.client.RackChangeNotifier; @@ -62,6 +63,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping public static final String BOOKIE_INFO_ROOT_PATH = "/bookies"; public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE"; + public static final String ADD_TO_CLEANUP_CONSUMER_INSTANCE = "ADD_TO_CLEANUP_CONSUMER"; private MetadataCache bookieMappingCache = null; private ITopologyAwareEnsemblePlacementPolicy rackawarePolicy = null; @@ -122,7 +124,12 @@ public synchronized void setConf(Configuration conf) { } bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - store.registerListener(this::handleUpdates); + Runnable cancelMetadataStoreListener = store.registerCancellableListener(this::handleUpdates); + Consumer addToCleanupConsumer = + (Consumer) conf.getProperty(ADD_TO_CLEANUP_CONSUMER_INSTANCE); + if (addToCleanupConsumer != null) { + addToCleanupConsumer.accept(cancelMetadataStoreListener); + } try { var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index b0cc50edf1f1d..f80bbe6c31f66 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -216,8 +216,8 @@ public void setFailureDomainWithCreate(String clusterName, String domainName, setWithCreate(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName), createFunction); } - public void registerListener(Consumer listener) { - getStore().registerListener(n -> { + public Runnable registerListener(Consumer listener) { + return getStore().registerCancellableListener(n -> { // Prefilter the notification just for failure domains if (n.getPath().startsWith(BASE_CLUSTERS_PATH) && n.getPath().contains("/" + FAILURE_DOMAIN)) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java index 43376f4055063..52acebdeacd68 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java @@ -61,9 +61,9 @@ public MetadataStoreCacheLoader(PulsarResources pulsarResources, int operationTi * @throws Exception */ public void init() throws Exception { - loadReportResources.getStore().registerListener((n) -> { + loadReportResources.getStore().registerCancellableListener((n) -> { if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) { - loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{ + loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes) -> { updateBrokerList(brokerNodes).thenRun(() -> { log.info("Successfully updated broker info {}", brokerNodes); }).exceptionally(ex -> { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 8ce4e5c8e2045..f2d962032dd59 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -47,7 +47,7 @@ public class TopicResources { public TopicResources(MetadataStore store) { this.store = store; topicListeners = new ConcurrentHashMap<>(); - store.registerListener(this::handleNotification); + store.registerCancellableListener(this::handleNotification); } /*** diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java index 07a4ae195c838..939e744125c16 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java @@ -44,7 +44,7 @@ public void setup() { @Test public void testConstructorRegistersAsListener() { - verify(metadataStore).registerListener(any()); + verify(metadataStore).registerCancellableListener(any()); } @Test diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 45299d9ed05d5..c531c881c6b8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -27,10 +27,13 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.channel.EventLoopGroup; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -52,6 +55,7 @@ @Slf4j public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory { + private final List cleanupCallbacks = new CopyOnWriteArrayList<>(); @Override public CompletableFuture create(ServiceConfiguration conf, MetadataStoreExtended store, @@ -70,6 +74,7 @@ public CompletableFuture create(ServiceConfiguration conf, MetadataS PulsarMetadataClientDriver.init(); ClientConfiguration bkConf = createBkClientConfiguration(store, conf); + bkConf.setProperty(BookieRackAffinityMapping.ADD_TO_CLEANUP_CONSUMER_INSTANCE, getAddToCleanupConsumer()); if (properties != null) { properties.forEach(bkConf::setProperty); } @@ -241,8 +246,14 @@ static void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfig } } + // handles cancelling a Metadata Store listener that was registered in BookieRackAffinityMapping + Consumer getAddToCleanupConsumer() { + return cleanupCallbacks::add; + } + @Override public void close() { - // Nothing to do + cleanupCallbacks.forEach(Runnable::run); + cleanupCallbacks.clear(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fde755403ff7f..a1e3474a3ef86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -306,6 +306,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { private String brokerId; private final CompletableFuture readyForIncomingRequestsFuture = new CompletableFuture<>(); private final List pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>(); + private volatile Runnable cancelSessionListener; + private volatile Runnable cancelMetadataStoreListener; public enum State { Init, Started, Closing, Closed @@ -501,6 +503,15 @@ public CompletableFuture closeAsync() { // It only tells the Pulsar clients that this service is not ready to serve for the lookup requests state = State.Closing; + if (cancelMetadataStoreListener != null) { + cancelMetadataStoreListener.run(); + cancelMetadataStoreListener = null; + } + if (cancelSessionListener != null) { + cancelSessionListener.run(); + cancelSessionListener = null; + } + if (healthChecker != null) { healthChecker.close(); healthChecker = null; @@ -862,7 +873,8 @@ public void start() throws PulsarServerException { : null; localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer, openTelemetry.getOpenTelemetryService().getOpenTelemetry()); - localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent); + cancelSessionListener = + localMetadataStore.registerCancellableSessionListener(this::handleMetadataSessionEvent); coordinationService = new CoordinationServiceImpl(localMetadataStore); @@ -1123,7 +1135,8 @@ protected PulsarResources newPulsarResources() { PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, config.getMetadataStoreOperationTimeoutSeconds(), getExecutor()); - pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); + cancelMetadataStoreListener = + pulsarResources.getClusterResources().getStore().registerCancellableListener(this::handleDeleteCluster); return pulsarResources; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 115bbe56ffa35..2da783170bb4c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -67,6 +67,7 @@ public class BrokerRegistryImpl implements BrokerRegistry { private final ScheduledExecutorService scheduler; private final List> listeners; + private Runnable cancelMetadataStoreListener; protected enum State { Init, @@ -111,7 +112,8 @@ public synchronized void start() throws PulsarServerException { if (!this.state.compareAndSet(State.Init, State.Started)) { throw new PulsarServerException("Cannot start the broker registry in state " + state.get()); } - pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); + cancelMetadataStoreListener = + pulsar.getLocalMetadataStore().registerCancellableListener(this::handleMetadataStoreNotification); try { this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { @@ -236,6 +238,7 @@ public synchronized void close() throws PulsarServerException { if (this.state.get() == State.Closed) { return; } + cancelMetadataStoreListener.run(); try { this.listeners.clear(); this.unregister(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index af95df60174db..65bfbe88eb437 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -186,6 +186,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS private UnloadManager unloadManager; private SplitManager splitManager; + private volatile Runnable cancelFailureDomainResourcesListener; enum State { INIT, @@ -387,7 +388,8 @@ public void start() throws PulsarServerException { this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); - antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); + cancelFailureDomainResourcesListener = + antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper); this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter); SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); @@ -796,6 +798,10 @@ public void close() throws PulsarServerException { return; } try { + if (cancelFailureDomainResourcesListener != null) { + cancelFailureDomainResourcesListener.run(); + cancelFailureDomainResourcesListener = null; + } stopLoadDataReportTasks(); this.unloadScheduler.close(); this.splitScheduler.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ecd98f188b4d1..fd4c8bbb6ca03 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -148,6 +148,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private volatile long lastOwnEventHandledAt = 0; private long lastOwnedServiceUnitCountAt = 0; private int totalOwnedServiceUnitCnt = 0; + private Runnable cancelSessionListener; public enum EventType { Assign, @@ -320,7 +321,8 @@ public synchronized void start() throws PulsarServerException { if (debug) { log.info("Successfully started the channel tableview."); } - pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); + cancelSessionListener = + pulsar.getLocalMetadataStore().registerCancellableSessionListener(this::handleMetadataSessionEvent); if (debug) { log.info("Successfully registered the handleMetadataSessionEvent"); } @@ -367,6 +369,10 @@ public synchronized void close() throws PulsarServerException { channelState = Closed; try { leaderElectionService = null; + if (cancelSessionListener != null) { + cancelSessionListener.run(); + cancelSessionListener = null; + } if (tableview != null) { tableview.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java index 3781c9c95f6a7..2d120ed4bb241 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/AntiAffinityGroupPolicyHelper.java @@ -59,10 +59,10 @@ public boolean hasAntiAffinityGroupPolicy(String bundle) { } } - public void listenFailureDomainUpdate() { + public Runnable listenFailureDomainUpdate() { LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); // register listeners for domain changes - pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() + return pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() .registerListener(__ -> { pulsar.getLoadManagerExecutor().execute(() -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 75c60e2687942..ed5256517464d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -190,7 +190,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private Map bundleBrokerAffinityMap; // array used for sorting and select topK bundles private final List> bundleArr = new ArrayList<>(); - + private volatile Runnable cancelMetadataListener; + private volatile Runnable cancelSessionListener; + private Runnable cancelFailureDomainResourcesListener; /** * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called. @@ -233,8 +235,10 @@ public void initialize(final PulsarService pulsar) { this.pulsar = pulsar; this.pulsarResources = pulsar.getPulsarResources(); brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class); - pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification); - pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); + cancelMetadataListener = + pulsar.getLocalMetadataStore().registerCancellableListener(this::handleDataNotification); + cancelSessionListener = + pulsar.getLocalMetadataStore().registerCancellableSessionListener(this::handleMetadataSessionEvent); if (SystemUtils.IS_OS_LINUX) { brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar); @@ -260,11 +264,13 @@ public void initialize(final PulsarService pulsar) { LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap); // register listeners for domain changes - pulsarResources.getClusterResources().getFailureDomainResources() - .registerListener(__ -> { - executors.execute( - () -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); - }); + cancelFailureDomainResourcesListener = + pulsarResources.getClusterResources().getFailureDomainResources() + .registerListener(__ -> { + executors.execute( + () -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, + brokerToFailureDomainMap)); + }); if (placementStrategy instanceof LoadSheddingStrategy) { // if the placement strategy is also a load shedding strategy @@ -1020,6 +1026,19 @@ public void start() throws PulsarServerException { */ @Override public void stop() throws PulsarServerException { + if (cancelMetadataListener != null) { + cancelMetadataListener.run(); + cancelMetadataListener = null; + } + if (cancelSessionListener != null) { + cancelSessionListener.run(); + cancelSessionListener = null; + } + if (cancelFailureDomainResourcesListener != null) { + cancelFailureDomainResourcesListener.run(); + cancelFailureDomainResourcesListener = null; + } + executors.shutdownNow(); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 30a7359ce0eb8..ecb03a32c2627 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -186,6 +186,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer updateRankingHandle; private Map bundleBrokerAffinityMap; + private Runnable cancelMetadataStoreListener; // Perform initializations which may be done without a PulsarService. public SimpleLoadManagerImpl() { @@ -234,7 +235,7 @@ public void initialize(final PulsarService pulsar) { lastLoadReport.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics()); loadReports = pulsar.getCoordinationService().getLockManager(LoadReport.class); - pulsar.getLocalMetadataStore().registerListener(this); + cancelMetadataStoreListener = pulsar.getLocalMetadataStore().registerCancellableListener(this); this.dynamicConfigurationCache = pulsar.getLocalMetadataStore().getMetadataCache( new TypeReference>() { }); @@ -1437,6 +1438,9 @@ public String setNamespaceBundleAffinity(String bundle, String broker) { @Override public void stop() throws PulsarServerException { try { + if (cancelMetadataStoreListener != null) { + cancelMetadataStoreListener.run(); + } if (loadReports != null) { loadReports.close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 32fe4ca14493f..10c1c14ff7796 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1851,5 +1851,6 @@ public void close() { LOG.warn("Error shutting down namespace client for cluster {}", cluster, e); } }); + bundleFactory.close(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java index 4a5b8a8bcc244..a902cfec1f854 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListener.java @@ -48,19 +48,20 @@ * @see Global-quotas * */ -public class ResourceGroupConfigListener implements Consumer { +public class ResourceGroupConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class); private final ResourceGroupService rgService; private final PulsarService pulsarService; private final ResourceGroupResources rgResources; + private final Runnable cancelMetadataStoreListener; private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener; public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) { this.rgService = rgService; this.pulsarService = pulsarService; this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources(); - this.rgResources.getStore().registerListener(this); + cancelMetadataStoreListener = this.rgResources.getStore().registerCancellableListener(this); execute(() -> loadAllResourceGroupsWithRetryAsync(0)); } @@ -187,4 +188,12 @@ protected void schedule(Runnable runnable, long delayMs) { ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() { return rgNamespaceConfigListener; } + + @Override + public void close() throws Exception { + cancelMetadataStoreListener.run(); + if (rgNamespaceConfigListener != null) { + rgNamespaceConfigListener.close(); + } + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java index e19034c525761..15d2baa16a9ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupNamespaceConfigListener.java @@ -39,7 +39,7 @@ * @see Global-quotas * */ -public class ResourceGroupNamespaceConfigListener implements Consumer { +public class ResourceGroupNamespaceConfigListener implements Consumer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupNamespaceConfigListener.class); private final ResourceGroupService rgService; @@ -47,6 +47,7 @@ public class ResourceGroupNamespaceConfigListener implements Consumer> owningTopics = new ConcurrentHashMap<>(); + private final Runnable cancelLocalMetadataStoreListener; + private final Runnable cancelConfigurationMetadataStoreListener; private long numberOfNamespaceBundles = 0; private final EventLoopGroup acceptorGroup; @@ -358,9 +360,13 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws pulsar.getConfiguration(), pulsar().getPulsarResources()); this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); - pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); + cancelLocalMetadataStoreListener = + pulsar.getLocalMetadataStore().registerCancellableListener(this::handleMetadataChanges); if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) { - pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); + cancelConfigurationMetadataStoreListener = + pulsar.getConfigurationMetadataStore().registerCancellableListener(this::handleMetadataChanges); + } else { + cancelConfigurationMetadataStoreListener = null; } this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder() @@ -851,6 +857,13 @@ public CompletableFuture closeAsync() { entryFilterProvider.close(); } + if (cancelLocalMetadataStoreListener != null) { + cancelLocalMetadataStoreListener.run(); + } + if (cancelConfigurationMetadataStoreListener != null) { + cancelConfigurationMetadataStoreListener.run(); + } + CompletableFuture> cancellableDownstreamFutureReference = new CompletableFuture<>(); log.info("Event loops shutting down gracefully..."); List> shutdownEventLoops = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 69f5208ce6711..7210a06a8d5df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -61,7 +61,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NamespaceBundleFactory { +public class NamespaceBundleFactory implements AutoCloseable{ private static final Logger LOG = LoggerFactory.getLogger(NamespaceBundleFactory.class); private final HashFunction hashFunc; @@ -74,6 +74,7 @@ public class NamespaceBundleFactory { private final TopicBundleAssignmentStrategy topicBundleAssignmentStrategy; private final Duration maxRetryDuration = Duration.ofSeconds(10); + private final Runnable cancelMetadataStoreListener; public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { this.hashFunc = hashFunc; @@ -84,7 +85,8 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { CacheMetricsCollector.CAFFEINE.addCache("bundles", this.bundlesCache); - pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); + cancelMetadataStoreListener = + pulsar.getLocalMetadataStore().registerCancellableListener(this::handleMetadataStoreNotification); this.pulsar = pulsar; @@ -425,4 +427,9 @@ public static Range getRange(Long lowerEndpoint, Long upperEndpoint) { (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); } + @Override + public void close() { + cancelMetadataStoreListener.run(); + CacheMetricsCollector.CAFFEINE.removeCache("bundles"); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 663a6f0a41900..37b92fa3b452b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -255,6 +255,7 @@ public void cleanup() throws Exception { pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0); adminTls.close(); otheradmin.close(); + bundleFactory.close(); super.internalCleanup(); mockPulsarSetup.cleanup(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 094a29c1c6c53..d3a2a45c2acfd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -1082,7 +1082,7 @@ private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exceptio public CompletableFuture registryBrokerDataChangeNotice() { CompletableFuture completableFuture = new CompletableFuture<>(); String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId(); - pulsar.getLocalMetadataStore().registerListener(notice -> { + pulsar.getLocalMetadataStore().registerCancellableListener(notice -> { if (brokerDataPath.equals(notice.getPath())){ if (!completableFuture.isDone()) { completableFuture.complete(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java index 86b1c1c4b8dd8..1341c6ff08ba0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java @@ -251,7 +251,7 @@ public void testResourceGroupCreateMany() throws Exception { public void testResourceGroupUpdateLoop() throws PulsarAdminException { ResourceGroup zooRg = new ResourceGroup(); - pulsar.getPulsarResources().getResourcegroupResources().getStore().registerListener( + pulsar.getPulsarResources().getResourcegroupResources().getStore().registerCancellableListener( notification -> { String notifyPath = notification.getPath(); if (!ResourceGroupResources.isResourceGroupPath(notifyPath)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java index 41fe580ce2be4..4a9b0c9ad7260 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java @@ -71,6 +71,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr protected Object localMetaDataStoreClientCnx; protected final AtomicBoolean connectionTerminationThreadKeepRunning = new AtomicBoolean(); private volatile Thread connectionTerminationThread; + private Runnable cancelSessionListener; protected void startZKAndBK() throws Exception { // Start ZK. @@ -90,7 +91,7 @@ protected void startBrokers() throws Exception { broker = pulsar.getBrokerService(); ZKMetadataStore zkMetadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore(); localZkOfBroker = zkMetadataStore.getZkClient(); - zkMetadataStore.registerSessionListener(n -> { + cancelSessionListener = zkMetadataStore.registerCancellableSessionListener(n -> { log.info("Received session event: {}", n); sessionEvent = n; }); @@ -227,6 +228,11 @@ protected void cleanup() throws Exception { markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---"); + if (cancelSessionListener != null) { + cancelSessionListener.run(); + cancelSessionListener = null; + } + stopLocalMetadataStoreConnectionTermination(); // Stop brokers. diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index f0ec8f52375f3..4cd537c11e8dc 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -156,12 +156,28 @@ default CompletableFuture deleteIfExists(String path, Optional expec CompletableFuture deleteRecursive(String path); /** - * Register a listener that will be called on changes in the underlying store. + * Register a listener that will be called on changes in the underlying metadata store. * - * @param listener - * a consumer of notifications + * @param listener A Consumer that handles notifications. The consumer will receive a {@link Notification} + * object containing details about what changed in the metadata store. + * @return A Runnable that can be used to stop receiving notifications and unregister the listener. + * Executing this Runnable will cancel the subscription. */ - void registerListener(Consumer listener); + Runnable registerCancellableListener(Consumer listener); + + /** + * Register a listener that will be called on changes in the underlying metadata store. + * This method is deprecated in favor of {@link #registerCancellableListener(Consumer)} which provides better + * lifecycle management through its return value. + * + * @param listener A Consumer that handles notifications about metadata store changes + * @deprecated Use {@link #registerCancellableListener(Consumer)} instead as it provides a way + * to properly unregister the listener when no longer needed + */ + @Deprecated + default void registerListener(Consumer listener) { + registerCancellableListener(listener); + } /** * Create a metadata cache specialized for a specific class. diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java index 182c14ef601a4..f58b4c9ebea07 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java @@ -72,8 +72,23 @@ CompletableFuture put(String path, byte[] value, Optional expectedVe * * @param listener * the session listener + * @return a runnable that can be used to unregister the listener */ - void registerSessionListener(Consumer listener); + Runnable registerCancellableSessionListener(Consumer listener); + + /** + * Register a session listener that will get notified of changes in status of the current session. + * This method is deprecated. Use {@link #registerCancellableSessionListener(Consumer)} instead. + * + * @param listener + * the session listener + * @deprecated Use {@link #registerCancellableSessionListener(Consumer)} which provides + * the ability to unregister the listener + */ + @Deprecated + default void registerSessionListener(Consumer listener) { + registerCancellableSessionListener(listener); + } /** * Get {@link MetadataEventSynchronizer} to notify and synchronize metadata events. @@ -95,4 +110,4 @@ default void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchroni default CompletableFuture handleMetadataEvent(MetadataEvent event) { return CompletableFuture.completedFuture(null); } -} +} \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java index 44870ed47f05b..8954ccad6b657 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java @@ -37,6 +37,7 @@ public class PulsarLedgerAuditorManager implements LedgerAuditorManager { private final CoordinationService coordinationService; private final LeaderElection leaderElection; + private final Runnable cancelSessionListener; private LeaderElectionState leaderElectionState; private String bookieId; private boolean sessionExpired = false; @@ -49,7 +50,7 @@ public class PulsarLedgerAuditorManager implements LedgerAuditorManager { this.leaderElection = coordinationService.getLeaderElection(String.class, electionPath, this::handleStateChanges); this.leaderElectionState = LeaderElectionState.NoLeader; - store.registerSessionListener(event -> { + cancelSessionListener = store.registerCancellableSessionListener(event -> { if (SessionEvent.SessionLost == event) { synchronized (this) { sessionExpired = true; @@ -106,6 +107,7 @@ public BookieId getCurrentAuditor() { @Override public void close() throws Exception { + cancelSessionListener.run(); leaderElection.close(); coordinationService.close(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java index b003c656353c0..3f7f78536b7f4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java @@ -88,7 +88,7 @@ public LedgerMetadata deserialize(String path, byte[] content, Stat stat) throws } }); - store.registerListener(this::handleDataNotification); + store.registerCancellableListener(this::handleDataNotification); } private static Throwable mapToBkException(Throwable ex) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 2673328b81139..d0c2cc75e32b5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -148,7 +148,7 @@ public PulsarLedgerUnderreplicationManager(AbstractConfiguration conf, Metada replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME; this.store = store; - store.registerListener(this::handleNotification); + store.registerCancellableListener(this::handleNotification); checkLayout(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java index f2776062818d8..842d4eacaf29c 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarMetadataClientDriver.java @@ -37,6 +37,8 @@ public class PulsarMetadataClientDriver extends AbstractMetadataDriver implement MetadataDrivers.registerClientDriver(METADATA_STORE_SCHEME, PulsarMetadataClientDriver.class); } + private Runnable cancelSessionListener; + public static void init() { // cause to be invoked } @@ -66,11 +68,23 @@ public LayoutManager getLayoutManager() { } @Override - public void setSessionStateListener(SessionStateListener sessionStateListener) { - store.registerSessionListener(event -> { + public synchronized void setSessionStateListener(SessionStateListener sessionStateListener) { + if (cancelSessionListener != null) { + cancelSessionListener.run(); + } + cancelSessionListener = store.registerCancellableSessionListener(event -> { if (event == SessionEvent.SessionLost) { sessionStateListener.onSessionExpired(); } }); } + + @Override + public synchronized void close() { + if (cancelSessionListener != null) { + cancelSessionListener.run(); + cancelSessionListener = null; + } + super.close(); + } } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index 89dbf2be990b0..cda3be4e24e9f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -70,6 +70,8 @@ public class PulsarRegistrationClient implements RegistrationClient { private final Map> writableBookieInfo; private final Map> readOnlyBookieInfo; private final FutureUtil.Sequencer sequencer; + private final Runnable cancelMetadataStoreListener; + private final Runnable cancelSessionListener; private SessionEvent lastMetadataSessionEvent; public PulsarRegistrationClient(MetadataStore store, @@ -90,12 +92,14 @@ public PulsarRegistrationClient(MetadataStore store, this.executor = Executors .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client")); - store.registerListener(this::updatedBookies); - this.store.registerSessionListener(this::refreshBookies); + cancelMetadataStoreListener = store.registerCancellableListener(this::updatedBookies); + cancelSessionListener = this.store.registerCancellableSessionListener(this::refreshBookies); } @Override public void close() { + cancelMetadataStoreListener.run(); + cancelSessionListener.run(); executor.shutdownNow(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index ab35eb7040c10..63a4ad4b40bd9 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -55,6 +55,8 @@ class LeaderElectionImpl implements LeaderElection { private final MetadataCache cache; private final Consumer stateChangesListener; private final ScheduledFuture updateCachedValueFuture; + private final Runnable cancelMetadataListener; + private final Runnable cancelSessionListener; private LeaderElectionState leaderElectionState; private Optional version = Optional.empty(); @@ -86,8 +88,8 @@ private enum InternalState { this.stateChangesListener = stateChangesListener; this.executor = executor; this.sequencer = FutureUtil.Sequencer.create(); - store.registerListener(this::handlePathNotification); - store.registerSessionListener(this::handleSessionNotification); + cancelMetadataListener = store.registerCancellableListener(this::handlePathNotification); + cancelSessionListener = store.registerCancellableSessionListener(this::handleSessionNotification); updateCachedValueFuture = executor.scheduleWithFixedDelay(this::getLeaderValue, metadataCacheConfig.getRefreshAfterWriteMillis() / 2, metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS); @@ -269,6 +271,8 @@ public synchronized CompletableFuture asyncClose() { } internalState = InternalState.Closed; + cancelMetadataListener.run(); + cancelSessionListener.run(); if (leaderElectionState != LeaderElectionState.Leading) { return CompletableFuture.completedFuture(null); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java index b6b5c57ccea39..180b3fd03c929 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java @@ -53,6 +53,8 @@ class LockManagerImpl implements LockManager { private final MetadataSerde serde; private final FutureUtil.Sequencer sequencer; private final ScheduledExecutorService executor; + private Runnable cancelSessionListener; + private Runnable cancelMetadataListener; private enum State { Ready, Closed @@ -72,8 +74,8 @@ private enum State { this.serde = serde; this.executor = executor; this.sequencer = FutureUtil.Sequencer.create(); - store.registerSessionListener(this::handleSessionEvent); - store.registerListener(this::handleDataNotification); + cancelSessionListener = store.registerCancellableSessionListener(this::handleSessionEvent); + cancelMetadataListener = store.registerCancellableListener(this::handleDataNotification); } @Override @@ -167,6 +169,9 @@ public void close() throws Exception { public CompletableFuture asyncClose() { Map> locks; synchronized (this) { + cancelSessionListener.run(); + cancelMetadataListener.run(); + if (state != State.Ready) { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 33ef44d759915..56a7db54d9c34 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -95,7 +95,7 @@ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTele this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory( StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); - registerListener(this); + registerCancellableListener(this); this.childrenCache = Caffeine.newBuilder() .recordStats() @@ -319,10 +319,17 @@ public final CompletableFuture exists(String path) { } @Override - public void registerListener(Consumer listener) { + public Runnable registerCancellableListener(Consumer listener) { // If the metadata store is closed, do nothing here. if (!isClosed()) { listeners.add(listener); + return () -> { + listeners.remove(listener); + }; + } else { + return () -> { + // Do nothing + }; } } @@ -503,8 +510,17 @@ public final CompletableFuture putInternal(String path, byte[] data, Optio } @Override - public void registerSessionListener(Consumer listener) { - sessionListeners.add(listener); + public Runnable registerCancellableSessionListener(Consumer listener) { + if (!isClosed()) { + sessionListeners.add(listener); + return () -> { + sessionListeners.remove(listener); + }; + } else { + return () -> { + // Do nothing + }; + } } protected void receivedSessionEvent(SessionEvent event) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java index 91cd3754d6916..e0e0a895eb125 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java @@ -146,8 +146,8 @@ public CompletableFuture deleteRecursive(String path) { } @Override - public void registerListener(Consumer listener) { - store.registerListener(listener); + public Runnable registerCancellableListener(Consumer listener) { + return store.registerCancellableListener(listener); } @Override @@ -178,9 +178,13 @@ private MetadataCache injectMetadataStoreInMetadataCache(MetadataCache } @Override - public void registerSessionListener(Consumer listener) { - store.registerSessionListener(listener); + public Runnable registerCancellableSessionListener(Consumer listener) { + Runnable unregisterCallback = store.registerCancellableSessionListener(listener); sessionListeners.add(listener); + return () -> { + unregisterCallback.run(); + sessionListeners.remove(listener); + }; } @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index e95f1947740c8..8a8b52520b157 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -94,8 +94,8 @@ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadata value = new HashSet<>(); } value.forEach(v -> { - registerListener(v); - v.registerListener(this); + registerCancellableListener(v); + v.registerCancellableListener(this); }); value.add(this); return value; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java index c06fbe3cc07ae..7fe337c2eee31 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/tableview/impl/MetadataStoreTableViewImpl.java @@ -160,7 +160,7 @@ public MetadataStoreTableViewImpl(@NonNull Class clazz, .retryBackoff(MetadataCacheConfig.NO_RETRY_BACKOFF_BUILDER) .asyncReloadConsumer(this::consumeAsyncReload) .build()); - store.registerListener(this::handleNotification); + store.registerCancellableListener(this::handleNotification); if (store instanceof AbstractMetadataStore abstractMetadataStore) { abstractMetadataStore.registerSessionListener(this::handleSessionEvent); } else { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index fade12e07ca3c..4a59cb47a5e9a 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -190,7 +190,7 @@ public void crossStoreUpdates(String provider, Supplier urlSupplier) thr MetadataCacheImpl objCache2 = (MetadataCacheImpl) store2.getMetadataCache(MyClass.class); AtomicReference storeObj = new AtomicReference(); - store2.registerListener(n -> { + store2.registerCancellableListener(n -> { if (n.getType() == NotificationType.Modified) { CompletableFuture.runAsync(() -> { try { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 2af6760cbc4ad..6fed14f6e27c1 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -326,7 +326,7 @@ public void notificationListeners(String provider, Supplier urlSupplier) MetadataStoreConfig.builder().fsyncEnable(false).build()); BlockingQueue notifications = new LinkedBlockingDeque<>(); - store.registerListener(n -> { + store.registerCancellableListener(n -> { notifications.add(n); }); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 0c8869989bc91..565abd9617f1a 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -53,7 +53,7 @@ public void testDisconnection() throws Exception { .build()); BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); - store.registerSessionListener(sessionEvents::add); + store.registerCancellableSessionListener(sessionEvents::add); zks.stop(); @@ -77,7 +77,7 @@ public void testSessionLost() throws Exception { .build()); BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); - store.registerSessionListener(sessionEvents::add); + store.registerCancellableSessionListener(sessionEvents::add); zks.stop(); @@ -108,7 +108,7 @@ public void testReacquireLocksAfterSessionLost() throws Exception { .build()); BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); - store.registerSessionListener(sessionEvents::add); + store.registerCancellableSessionListener(sessionEvents::add); @Cleanup CoordinationService coordinationService = new CoordinationServiceImpl(store); @@ -147,7 +147,7 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { .build()); BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); - store.registerSessionListener(sessionEvents::add); + store.registerCancellableSessionListener(sessionEvents::add); BlockingQueue leaderElectionEvents = new LinkedBlockingQueue<>(); String path = newKey(); @@ -194,7 +194,7 @@ public void testElectAfterReconnected() throws Exception { BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); - store.registerSessionListener(sessionEvents::add); + store.registerCancellableSessionListener(sessionEvents::add); BlockingQueue leaderElectionEvents = new LinkedBlockingQueue<>(); String path = newKey(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index ac73491a81c65..804810464ddc5 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -665,7 +665,7 @@ public void testEnableLedgerReplication(String provider, Supplier urlSup final CountDownLatch znodeLatch = new CountDownLatch(2); String urledgerA = StringUtils.substringAfterLast(znodeA, "/"); String urLockLedgerA = basePath + "/locks/" + urledgerA; - store.registerListener(n -> { + store.registerCancellableListener(n -> { if (n.getType() == NotificationType.Created && n.getPath().equals(urLockLedgerA)) { znodeLatch.countDown(); log.debug("Recieved node creation event for the zNodePath:"