Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix inconsistent topic policy (apache#21231)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Sep 26, 2023
1 parent 48d5b9d commit afc9244
Show file tree
Hide file tree
Showing 30 changed files with 478 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.proxy.server.ProxyConfiguration;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
Expand Down Expand Up @@ -193,15 +194,17 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client/" + localHostname + "@" + kdc.getRealm()));

super.init();

lookupUrl = new URI(pulsar.getBrokerServiceUrl());

// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));

super.init();

lookupUrl = new URI(pulsar.getBrokerServiceUrl());
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.client.impl.auth.AuthenticationSasl;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -186,7 +187,12 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);
conf.setClusterName("test");
conf.setSuperUserRoles(ImmutableSet.of("client" + "@" + kdc.getRealm()));

Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
conf.setBrokerClientAuthenticationPlugin(AuthenticationSasl.class.getName());
conf.setBrokerClientAuthenticationParameters(ObjectMapperFactory
.getMapper().getObjectMapper().writeValueAsString(clientSaslConfig));
super.init();

lookupUrl = new URI(pulsar.getWebServiceAddress());
Expand All @@ -197,9 +203,6 @@ protected void setup() throws Exception {
.authentication(authSasl));

// set admin auth, to verify admin web resources
Map<String, String> clientSaslConfig = new HashMap<>();
clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient");
clientSaslConfig.put("serverType", "broker");
log.info("set client jaas section name: PulsarClient");
admin = PulsarAdmin.builder()
.serviceHttpUrl(brokerUrl.toString())
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.service;

import static java.util.Objects.requireNonNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,8 +81,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic

private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
readerCaches = new ConcurrentHashMap<>();
@VisibleForTesting
final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();

final Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap = new ConcurrentHashMap<>();

@VisibleForTesting
final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -219,12 +222,12 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
boolean isGlobal) throws TopicPoliciesCacheNotInitException {
if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
NamespaceName namespace = topicName.getNamespaceObject();
prepareInitPoliciesCache(namespace, new CompletableFuture<>());
prepareInitPoliciesCacheAsync(namespace);
}

MutablePair<TopicPoliciesCacheNotInitException, TopicPolicies> result = new MutablePair<>();
policyCacheInitMap.compute(topicName.getNamespaceObject(), (k, initialized) -> {
if (initialized == null || !initialized) {
if (initialized == null || !initialized.isDone()) {
result.setLeft(new TopicPoliciesCacheNotInitException());
} else {
TopicPolicies topicPolicies =
Expand All @@ -242,6 +245,34 @@ public TopicPolicies getTopicPolicies(TopicName topicName,
}
}

@NotNull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
boolean isGlobal) {
requireNonNull(topicName);
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
return preparedFuture.thenApply(__ -> {
final TopicPolicies candidatePolicies = isGlobal
? globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName()))
: policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
return Optional.ofNullable(candidatePolicies);
});
}

@NotNull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
requireNonNull(topicName);
final CompletableFuture<Void> preparedFuture = prepareInitPoliciesCacheAsync(topicName.getNamespaceObject());
return preparedFuture.thenApply(__ -> {
final TopicPolicies localPolicies = policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
if (localPolicies != null) {
return Optional.of(localPolicies);
}
return Optional.ofNullable(globalPoliciesCache.get(TopicName.get(topicName.getPartitionedTopicName())));
});
}

@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName()));
Expand All @@ -265,39 +296,48 @@ public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicNa

@Override
public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
CompletableFuture<Void> result = new CompletableFuture<>();
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.isHeartbeatNamespace(namespace)) {
result.complete(null);
return result;
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
result.complete(null);
return CompletableFuture.completedFuture(null);
} else {
prepareInitPoliciesCache(namespace, result);
return prepareInitPoliciesCacheAsync(namespace);
}
}
return result;
}

private void prepareInitPoliciesCache(@Nonnull NamespaceName namespace, CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
private @Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
readerCompletableFuture.thenAccept(reader -> {
initPolicesCache(reader, result);
result.thenRun(() -> readMorePolicies(reader));
}).exceptionally(ex -> {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
result.completeExceptionally(ex);
final CompletableFuture<Void> initFuture = readerCompletableFuture
.thenCompose(reader -> {
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
initPolicesCache(reader, stageFuture);
return stageFuture
// Read policies in background
.thenAccept(__ -> readMorePoliciesAsync(reader));
});
initFuture.exceptionally(ex -> {
try {
log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
cleanCacheAndCloseReader(namespace, false);
} catch (Throwable cleanupEx) {
// Adding this catch to avoid break callback chain
log.error("[{}] Failed to cleanup reader on __change_events topic", namespace, cleanupEx);
}
return null;
});
}
// let caller know we've got an exception.
return initFuture;
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
Expand Down Expand Up @@ -381,8 +421,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
if (log.isDebugEnabled()) {
log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);

// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
Expand All @@ -395,6 +434,7 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
}
}
}));

future.complete(null);
}
});
Expand All @@ -420,15 +460,21 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
});
}

private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
/**
* This is an async method for the background reader to continue syncing new messages.
*
* Note: You should not do any blocking call here. because it will affect
* #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
*/
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
})
.whenComplete((__, ex) -> {
if (ex == null) {
readMorePolicies(reader);
readMorePoliciesAsync(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
Expand All @@ -437,7 +483,7 @@ private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
readMorePoliciesAsync(reader);
}
}
});
Expand Down Expand Up @@ -605,7 +651,7 @@ boolean checkReaderIsCached(NamespaceName namespaceName) {
}

@VisibleForTesting
public Boolean getPoliciesCacheInit(NamespaceName namespaceName) {
public CompletableFuture<Void> getPoliciesCacheInit(NamespaceName namespaceName) {
return policyCacheInitMap.get(namespaceName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.jetbrains.annotations.NotNull;

/**
* Topic policies service.
Expand Down Expand Up @@ -109,6 +111,32 @@ default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetr
return response;
}

/**
* Asynchronously retrieves topic policies.
* This triggers the Pulsar broker's internal client to load policies from the
* system topic `persistent://tenant/namespace/__change_event`.
*
* @param topicName The name of the topic.
* @param isGlobal Indicates if the policies are global.
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
@Nonnull
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal);

/**
* Asynchronously retrieves topic policies.
* This triggers the Pulsar broker's internal client to load policies from the
* system topic `persistent://tenant/namespace/__change_event`.
*
* NOTE: If local policies are not available, it will fallback to using topic global policies.
* @param topicName The name of the topic.
* @return A CompletableFuture containing an Optional of TopicPolicies.
* @throws NullPointerException If the topicName is null.
*/
@Nonnull
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);

/**
* Get policies for a topic without cache async.
* @param topicName topic name
Expand Down Expand Up @@ -162,6 +190,19 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal)
return null;
}

@NotNull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName,
boolean isGlobal) {
return CompletableFuture.completedFuture(Optional.empty());
}

@NotNull
@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@NotNull TopicName topicName) {
return CompletableFuture.completedFuture(Optional.empty());
}

@Override
public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void initPersistentTopics() throws Exception {
@Override
@BeforeMethod
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(false);
super.internalSetup();
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Excep
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()));
assertTrue(policyService.getPoliciesCacheInit(topicName.getNamespaceObject()).isDone());
});

//load the topic.
Expand Down
Loading

0 comments on commit afc9244

Please sign in to comment.