Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
*/
@Getter
private boolean metadataServiceAvailable;
private final Runnable cancelSessionListener;

private static class PendingInitializeManagedLedger {

Expand Down Expand Up @@ -243,7 +244,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
evictionTaskInterval, evictionTaskInterval, TimeUnit.MILLISECONDS);
closed = false;

metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
cancelSessionListener =
metadataStore.registerCancellableSessionListener(this::handleMetadataStoreNotification);

openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
openTelemetryManagedLedgerStats = new OpenTelemetryManagedLedgerStats(openTelemetry, this);
Expand Down Expand Up @@ -648,6 +650,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
store.close();
cancelSessionListener.run();
openTelemetryManagedCursorStats.close();
openTelemetryManagedLedgerStats.close();
openTelemetryCacheStats.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,11 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn
* @return a future represents the result of the operation.
*/
CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);

/**
* Close the store.
*/
default void close() {
// Default implementation does nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class MetaStoreImpl implements MetaStore, Consumer<Notification> {

private final MetadataStore store;
private final OrderedExecutor executor;
private final Runnable cancelMetadataStoreListener;

private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final MetadataCompressionConfig ledgerInfoCompressionConfig;
Expand All @@ -74,7 +75,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.cursorInfoCompressionConfig = MetadataCompressionConfig.noCompression;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
cancelMetadataStoreListener = store.registerCancellableListener(this);
} else {
cancelMetadataStoreListener = null;
}
}

Expand All @@ -87,7 +90,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor,
this.cursorInfoCompressionConfig = cursorInfoCompressionConfig;
managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>();
if (store != null) {
store.registerListener(this);
cancelMetadataStoreListener = store.registerCancellableListener(this);
} else {
cancelMetadataStoreListener = null;
}
}

Expand Down Expand Up @@ -548,4 +553,9 @@ private CompressionCodec getCompressionCodec(CompressionType compressionType) {
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}

public void close() {
if (cancelMetadataStoreListener != null) {
cancelMetadataStoreListener.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping

public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
public static final String ADD_TO_CLEANUP_CONSUMER_INSTANCE = "ADD_TO_CLEANUP_CONSUMER";

private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
Expand Down Expand Up @@ -122,7 +124,12 @@ public synchronized void setConf(Configuration conf) {
}

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);
Runnable cancelMetadataStoreListener = store.registerCancellableListener(this::handleUpdates);
Consumer<Runnable> addToCleanupConsumer =
(Consumer<Runnable>) conf.getProperty(ADD_TO_CLEANUP_CONSUMER_INSTANCE);
if (addToCleanupConsumer != null) {
addToCleanupConsumer.accept(cancelMetadataStoreListener);
}

try {
var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public void setFailureDomainWithCreate(String clusterName, String domainName,
setWithCreate(joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN, domainName), createFunction);
}

public void registerListener(Consumer<Notification> listener) {
getStore().registerListener(n -> {
public Runnable registerListener(Consumer<Notification> listener) {
return getStore().registerCancellableListener(n -> {
// Prefilter the notification just for failure domains
if (n.getPath().startsWith(BASE_CLUSTERS_PATH)
&& n.getPath().contains("/" + FAILURE_DOMAIN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public MetadataStoreCacheLoader(PulsarResources pulsarResources, int operationTi
* @throws Exception
*/
public void init() throws Exception {
loadReportResources.getStore().registerListener((n) -> {
loadReportResources.getStore().registerCancellableListener((n) -> {
if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) {
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes) -> {
updateBrokerList(brokerNodes).thenRun(() -> {
log.info("Successfully updated broker info {}", brokerNodes);
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class TopicResources {
public TopicResources(MetadataStore store) {
this.store = store;
topicListeners = new ConcurrentHashMap<>();
store.registerListener(this::handleNotification);
store.registerCancellableListener(this::handleNotification);
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setup() {

@Test
public void testConstructorRegistersAsListener() {
verify(metadataStore).registerListener(any());
verify(metadataStore).registerCancellableListener(any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -52,6 +55,7 @@

@Slf4j
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
private final List<Runnable> cleanupCallbacks = new CopyOnWriteArrayList<>();

@Override
public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataStoreExtended store,
Expand All @@ -70,6 +74,7 @@ public CompletableFuture<BookKeeper> create(ServiceConfiguration conf, MetadataS
PulsarMetadataClientDriver.init();

ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
bkConf.setProperty(BookieRackAffinityMapping.ADD_TO_CLEANUP_CONSUMER_INSTANCE, getAddToCleanupConsumer());
if (properties != null) {
properties.forEach(bkConf::setProperty);
}
Expand Down Expand Up @@ -241,8 +246,14 @@ static void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfig
}
}

// handles cancelling a Metadata Store listener that was registered in BookieRackAffinityMapping
Consumer<Runnable> getAddToCleanupConsumer() {
return cleanupCallbacks::add;
}

@Override
public void close() {
// Nothing to do
cleanupCallbacks.forEach(Runnable::run);
cleanupCallbacks.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();
private volatile Runnable cancelSessionListener;
private volatile Runnable cancelMetadataStoreListener;

public enum State {
Init, Started, Closing, Closed
Expand Down Expand Up @@ -497,6 +499,15 @@ public CompletableFuture<Void> closeAsync() {
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
state = State.Closing;

if (cancelMetadataStoreListener != null) {
cancelMetadataStoreListener.run();
cancelMetadataStoreListener = null;
}
if (cancelSessionListener != null) {
cancelSessionListener.run();
cancelSessionListener = null;
}

if (healthChecker != null) {
healthChecker.close();
healthChecker = null;
Expand Down Expand Up @@ -852,7 +863,8 @@ public void start() throws PulsarServerException {
: null;
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer,
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
cancelSessionListener =
localMetadataStore.registerCancellableSessionListener(this::handleMetadataSessionEvent);

coordinationService = new CoordinationServiceImpl(localMetadataStore);

Expand Down Expand Up @@ -1113,7 +1125,8 @@ protected PulsarResources newPulsarResources() {
PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());

pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
cancelMetadataStoreListener =
pulsarResources.getClusterResources().getStore().registerCancellableListener(this::handleDeleteCluster);
return pulsarResources;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
private final ScheduledExecutorService scheduler;

private final List<BiConsumer<String, NotificationType>> listeners;
private Runnable cancelMetadataStoreListener;

protected enum State {
Init,
Expand Down Expand Up @@ -111,7 +112,8 @@ public synchronized void start() throws PulsarServerException {
if (!this.state.compareAndSet(State.Init, State.Started)) {
throw new PulsarServerException("Cannot start the broker registry in state " + state.get());
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
cancelMetadataStoreListener =
pulsar.getLocalMetadataStore().registerCancellableListener(this::handleMetadataStoreNotification);
try {
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
Expand Down Expand Up @@ -236,6 +238,7 @@ public synchronized void close() throws PulsarServerException {
if (this.state.get() == State.Closed) {
return;
}
cancelMetadataStoreListener.run();
try {
this.listeners.clear();
this.unregister();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
private UnloadManager unloadManager;

private SplitManager splitManager;
private volatile Runnable cancelFailureDomainResourcesListener;

enum State {
INIT,
Expand Down Expand Up @@ -383,7 +384,8 @@ public void start() throws PulsarServerException {

this.antiAffinityGroupPolicyHelper =
new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel);
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
cancelFailureDomainResourcesListener =
antiAffinityGroupPolicyHelper.listenFailureDomainUpdate();
this.antiAffinityGroupPolicyFilter = new AntiAffinityGroupPolicyFilter(antiAffinityGroupPolicyHelper);
this.brokerFilterPipeline.add(antiAffinityGroupPolicyFilter);
SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar);
Expand Down Expand Up @@ -770,6 +772,10 @@ public void close() throws PulsarServerException {
return;
}
try {
if (cancelFailureDomainResourcesListener != null) {
cancelFailureDomainResourcesListener.run();
cancelFailureDomainResourcesListener = null;
}
stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private volatile long lastOwnEventHandledAt = 0;
private long lastOwnedServiceUnitCountAt = 0;
private int totalOwnedServiceUnitCnt = 0;
private Runnable cancelSessionListener;

public enum EventType {
Assign,
Expand Down Expand Up @@ -320,7 +321,8 @@ public synchronized void start() throws PulsarServerException {
if (debug) {
log.info("Successfully started the channel tableview.");
}
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
cancelSessionListener =
pulsar.getLocalMetadataStore().registerCancellableSessionListener(this::handleMetadataSessionEvent);
if (debug) {
log.info("Successfully registered the handleMetadataSessionEvent");
}
Expand Down Expand Up @@ -367,6 +369,10 @@ public synchronized void close() throws PulsarServerException {
channelState = Closed;
try {
leaderElectionService = null;
if (cancelSessionListener != null) {
cancelSessionListener.run();
cancelSessionListener = null;
}

if (tableview != null) {
tableview.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public boolean hasAntiAffinityGroupPolicy(String bundle) {
}
}

public void listenFailureDomainUpdate() {
public Runnable listenFailureDomainUpdate() {
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap);
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
return pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
pulsar.getLoadManagerExecutor().execute(() ->
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap));
Expand Down
Loading
Loading