Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix namespace bundle stuck in unloading status (apache#…
Browse files Browse the repository at this point in the history
…21445)

### Motivation

PR apache#21231 made user topic creation rely on system topic `__change_event` if the user is enabling `topicLevelPoliciesEnabled`.

It will introduce a race condition with namespace bundle unloading. All creating topics want to create `__change_event` reader but constantly fail by namespace-bundle inactive and retry mechanism. Unfortunately, the namespace-bundle unloading operation is waiting for all the topics to be completed and then release inactive status. Therefore, they will be stuck in a deadlock until one gets a timeout.


### Modifications

- Get the topic policy before loading.
  • Loading branch information
mattisonchao authored Nov 8, 2023
1 parent f581417 commit 428c18c
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 216 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.MutablePair;
Expand All @@ -43,10 +42,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.events.ActionType;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.events.PulsarEvent;
Expand Down Expand Up @@ -320,7 +317,7 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
requireNonNull(namespace);
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
createSystemTopicClientWithRetry(namespace);
createSystemTopicClient(namespace);
readerCaches.put(namespace, readerCompletableFuture);
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
final CompletableFuture<Void> initFuture = readerCompletableFuture
Expand All @@ -346,20 +343,16 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
});
}

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry(
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
} catch (PulsarServerException ex) {
return FutureUtil.failedFuture(ex);
}
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
final SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result);
return result;
return systemTopicClient.newReaderAsync();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
});
final String topicPoliciesServiceInitException
= "Topic creation encountered an exception by initialize topic policies service";

// Creating a producer and creating a Consumer may trigger automatic topic
// creation, let's try to create a Producer and a Consumer
Expand All @@ -145,7 +147,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
Expand All @@ -155,7 +158,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic)));
assertTrue(expected.getMessage().contains(String.format(msg, topic))
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

import com.google.common.collect.Sets;

import lombok.Cleanup;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand All @@ -34,6 +38,9 @@ public class NamespaceUnloadingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setForceDeleteNamespaceAllowed(true);
conf.setTopicLoadTimeoutSeconds(Integer.MAX_VALUE);
super.baseSetup();
}

Expand Down Expand Up @@ -68,4 +75,26 @@ public void testUnloadPartiallyLoadedNamespace() throws Exception {
producer.close();
}

@Test
public void testUnloadWithTopicCreation() throws PulsarAdminException, PulsarClientException {
final String namespaceName = "prop/ns_unloading";
final String topicName = "persistent://prop/ns_unloading/with_topic_creation";
final int partitions = 5;
admin.namespaces().createNamespace(namespaceName, 1);
admin.topics().createPartitionedTopic(topicName, partitions);
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topicName)
.create();

for (int i = 0; i < 100; i++) {
admin.namespaces().unloadNamespaceBundle(namespaceName, "0x00000000_0xffffffff");
}

for (int i = 0; i < partitions; i++) {
producer.send(i);
}
admin.namespaces().deleteNamespace(namespaceName, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception {

// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName, true, null);
.loadOrCreatePersistentTopic(topicName, true, null, null);

try {
futureResult.get();
Expand Down Expand Up @@ -1204,7 +1204,7 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex
for (int i = 0; i < 10; i++) {
// try to create topic which should fail as bundle is disable
CompletableFuture<Optional<Topic>> futureResult = pulsar.getBrokerService()
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null);
.loadOrCreatePersistentTopic(topicName + "_" + i, false, null, null);
loadFutures.add(futureResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
Expand All @@ -44,11 +41,8 @@
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
Expand All @@ -57,7 +51,6 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
Expand Down Expand Up @@ -322,28 +315,6 @@ public void testGetPolicyTimeout() throws Exception {
assertTrue("actual:" + cost, cost >= 5000 - 1000);
}

@Test
public void testCreatSystemTopicClientWithRetry() throws Exception {
SystemTopicBasedTopicPoliciesService service =
spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
Field field = SystemTopicBasedTopicPoliciesService.class
.getDeclaredField("namespaceEventsSystemTopicFactory");
field.setAccessible(true);
NamespaceEventsSystemTopicFactory factory = spy((NamespaceEventsSystemTopicFactory) field.get(service));
SystemTopicClient<PulsarEvent> client = mock(TopicPoliciesSystemTopicClient.class);
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
field.set(service, factory);

SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class);
// Throw an exception first, create successfully after retrying
doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();

SystemTopicClient.Reader<PulsarEvent> reader1 = service.createSystemTopicClientWithRetry(null).get();

assertEquals(reader1, reader);
}

@Test
public void testGetTopicPoliciesWithRetry() throws Exception {
Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void testNonDurableSubscribe() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("non-durable");
admin.topics().createNonPartitionedTopic(topicName);

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -214,6 +215,7 @@ public void testRead() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("reader");
admin.topics().createNonPartitionedTopic(topicName);

int numberOfMessages = 10;
@Cleanup("shutdownNow")
Expand Down Expand Up @@ -263,6 +265,7 @@ public void testEncryption() throws Exception {
properties.setProperty("useTls", "false");

final String topicName = getTopicWithRandomSuffix("encryption");
admin.topics().createNonPartitionedTopic(topicName);
final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/";
final int numberOfMessages = 10;

Expand Down

0 comments on commit 428c18c

Please sign in to comment.