Skip to content

Commit

Permalink
[improve][broker] Improve SystemTopicBasedTopicPoliciesService reader…
Browse files Browse the repository at this point in the history
… to reduce GC pressure (#23780)
  • Loading branch information
dao-jun authored Jan 3, 2025
1 parent 3d71c87 commit a6986b1
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,11 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
}
if (hasMore) {
reader.readNextAsync().thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
try {
refreshTopicPoliciesCache(msg);
} finally {
msg.release();
}
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
Expand Down Expand Up @@ -505,8 +509,12 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
}
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
try {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
} finally {
msg.release();
}
})
.whenComplete((__, ex) -> {
if (ex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected CompletableFuture<Reader<PulsarEvent>> newReaderAsyncInternal() {
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.poolMessages(true)
.createAsync()
.thenApply(reader -> {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
Expand Down Expand Up @@ -67,7 +68,9 @@ public static Optional<TopicPolicies> getTopicPoliciesBypassCache(TopicPoliciesS
.newReader();
PulsarEvent event = null;
while (reader.hasMoreEvents()) {
event = reader.readNext().getValue();
@Cleanup("release")
Message<PulsarEvent> message = reader.readNext();
event = message.getValue();
}
return Optional.ofNullable(event).map(e -> e.getTopicPoliciesEvent().getPolicies());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception {
.build();
systemTopicClientForNamespace1.newWriter().write(getEventKey(event), event);
SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader();
@Cleanup("release")
Message<PulsarEvent> received = reader.readNext();
log.info("Receive pulsar event from system topic : {}", received.getValue());

Expand All @@ -139,6 +140,7 @@ public void testSendAndReceiveNamespaceEvents() throws Exception {

// test new reader read
SystemTopicClient.Reader reader1 = systemTopicClientForNamespace1.newReader();
@Cleanup("release")
Message<PulsarEvent> received1 = reader1.readNext();
log.info("Receive pulsar event from system topic : {}", received1.getValue());
Assert.assertEquals(received1.getValue(), event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,13 @@ public void testSystemTopicNotCheckExceed() throws Exception {

FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
Assert.assertTrue(reader1.hasMoreEvents());
Assert.assertNotNull(reader1.readNext());
Message<?> message = reader1.readNext();
Assert.assertNotNull(message);
message.release();
Assert.assertTrue(reader2.hasMoreEvents());
message = reader2.readNext();
Assert.assertNotNull(reader2.readNext());
message.release();
reader1.close();
reader2.close();
writer1.get().close();
Expand Down

0 comments on commit a6986b1

Please sign in to comment.