From ece31632597d196eadd3ead71520b4cf086d4d40 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 25 Dec 2024 00:40:35 +0800 Subject: [PATCH 1/2] Make SystemTopicBasedTopicPoliciesService's reader use poolmessage to reduce the GC pressure. --- .../SystemTopicBasedTopicPoliciesService.java | 14 +++++++++++--- .../systopic/TopicPoliciesSystemTopicClient.java | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index cc3938491e637..0b6bb9da431dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -429,7 +429,11 @@ private void initPolicesCache(SystemTopicClient.Reader reader, Comp } if (hasMore) { reader.readNextAsync().thenAccept(msg -> { - refreshTopicPoliciesCache(msg); + try { + refreshTopicPoliciesCache(msg); + } finally { + msg.release(); + } if (log.isDebugEnabled()) { log.debug("[{}] Loop next event reading for system topic.", reader.getSystemTopic().getTopicName().getNamespaceObject()); @@ -506,8 +510,12 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) } reader.readNextAsync() .thenAccept(msg -> { - refreshTopicPoliciesCache(msg); - notifyListener(msg); + try { + refreshTopicPoliciesCache(msg); + notifyListener(msg); + } finally { + msg.release(); + } }) .whenComplete((__, ex) -> { if (ex == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java index ea3ac507d1128..6ada9db2967c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java @@ -73,6 +73,7 @@ protected CompletableFuture> newReaderAsyncInternal() { .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX) .startMessageId(MessageId.earliest) .readCompacted(true) + .poolMessages(true) .createAsync() .thenApply(reader -> { if (log.isDebugEnabled()) { From 850df7b5e99c7ca947f0b569b659c7e5d5f1a933 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 25 Dec 2024 16:46:47 +0800 Subject: [PATCH 2/2] fix tests --- .../apache/pulsar/broker/service/TopicPolicyTestUtils.java | 5 ++++- .../systopic/NamespaceEventsSystemTopicServiceTest.java | 2 ++ .../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 6 +++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java index 9cf688d62edc6..d4275cdfd200f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import lombok.Cleanup; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -67,7 +68,9 @@ public static Optional getTopicPoliciesBypassCache(TopicPoliciesS .newReader(); PulsarEvent event = null; while (reader.hasMoreEvents()) { - event = reader.readNext().getValue(); + @Cleanup("release") + Message message = reader.readNext(); + event = message.getValue(); } return Optional.ofNullable(event).map(e -> e.getTopicPoliciesEvent().getPolicies()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index e66140efb32bb..aaa719515c99f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -129,6 +129,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception { .build(); systemTopicClientForNamespace1.newWriter().write(getEventKey(event), event); SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader(); + @Cleanup("release") Message received = reader.readNext(); log.info("Receive pulsar event from system topic : {}", received.getValue()); @@ -139,6 +140,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception { // test new reader read SystemTopicClient.Reader reader1 = systemTopicClientForNamespace1.newReader(); + @Cleanup("release") Message received1 = reader1.readNext(); log.info("Receive pulsar event from system topic : {}", received1.getValue()); Assert.assertEquals(received1.getValue(), event); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index e7bfa3278e36d..e31f78665b394 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -338,9 +338,13 @@ public void testSystemTopicNotCheckExceed() throws Exception { FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join(); Assert.assertTrue(reader1.hasMoreEvents()); - Assert.assertNotNull(reader1.readNext()); + Message message = reader1.readNext(); + Assert.assertNotNull(message); + message.release(); Assert.assertTrue(reader2.hasMoreEvents()); + message = reader2.readNext(); Assert.assertNotNull(reader2.readNext()); + message.release(); reader1.close(); reader2.close(); writer1.get().close();