Skip to content

Commit 344905f

Browse files
authored
[fix][broker] Avoid recursive update in ConcurrentHashMap during policy cache cleanup (#24939)
1 parent b758283 commit 344905f

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
588588
// Read policies in background
589589
.thenAccept(__ -> readMorePoliciesAsync(reader));
590590
});
591-
initFuture.exceptionally(ex -> {
591+
initFuture.exceptionallyAsync(ex -> {
592592
try {
593593
if (closed.get()) {
594594
return null;
@@ -601,7 +601,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
601601
namespace, cleanupEx);
602602
}
603603
return null;
604-
});
604+
}, pulsarService.getExecutor());
605605
// let caller know we've got an exception.
606606
return initFuture;
607607
}).thenApply(__ -> true);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashSet;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Optional;
2829
import java.util.Set;
2930
import java.util.UUID;
3031
import java.util.concurrent.CompletableFuture;
@@ -421,4 +422,47 @@ public void testPrepareInitPoliciesCacheAsyncWhenNamespaceBeingDeleted() throws
421422
service.prepareInitPoliciesCacheAsync(namespaceName).get();
422423
admin.namespaces().deleteNamespace(NAMESPACE5);
423424
}
425+
426+
@Test
427+
public void testCreateNamespaceEventsSystemTopicFactoryException() throws Exception {
428+
final String namespace = "system-topic/namespace-6";
429+
430+
admin.namespaces().createNamespace(namespace);
431+
432+
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), "topic-1");
433+
434+
SystemTopicBasedTopicPoliciesService service =
435+
Mockito.spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService());
436+
437+
// inject exception when create NamespaceEventsSystemTopicFactory
438+
Mockito.doThrow(new RuntimeException("test exception")).when(service)
439+
.getNamespaceEventsSystemTopicFactory();
440+
441+
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture;
442+
Optional<TopicPolicies> topicPoliciesOptional;
443+
try {
444+
topicPoliciesFuture =
445+
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
446+
topicPoliciesOptional = topicPoliciesFuture.join();
447+
Assert.fail();
448+
} catch (Exception e) {
449+
Assert.assertTrue(e.getCause().getMessage().contains("test exception"));
450+
}
451+
452+
Mockito.reset(service);
453+
454+
service.updateTopicPoliciesAsync(topicName, false, false, topicPolicies ->
455+
topicPolicies.setMaxConsumerPerTopic(10)).get();
456+
457+
topicPoliciesFuture =
458+
service.getTopicPoliciesAsync(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
459+
topicPoliciesOptional = topicPoliciesFuture.join();
460+
461+
Assert.assertNotNull(topicPoliciesOptional);
462+
Assert.assertTrue(topicPoliciesOptional.isPresent());
463+
464+
TopicPolicies topicPolicies = topicPoliciesOptional.get();
465+
Assert.assertNotNull(topicPolicies);
466+
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
467+
}
424468
}

0 commit comments

Comments
 (0)