From 52ffe82b2b08199b593a936173bd70974ad983be Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 25 Dec 2024 16:21:00 +0800 Subject: [PATCH] Make TransactionBuffer's reader use poolmessage to reduce the GC pressure. --- ...sactionBufferSnapshotBaseSystemTopicClient.java | 1 + .../SnapshotSegmentAbortedTxnProcessorImpl.java | 12 ++++++++++-- .../broker/transaction/buffer/impl/TableView.java | 14 +++++++++----- .../SegmentAbortedTxnProcessorTest.java | 2 ++ .../TopicTransactionBufferRecoverTest.java | 4 +++- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java index 4023cd88bef55..7ba01b09b2790 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java @@ -205,6 +205,7 @@ protected CompletableFuture> newReaderAsyncInternal() { .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX) .startMessageId(MessageId.earliest) .readCompacted(true) + .poolMessages(true) .createAsync() .thenApply(reader -> { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java index f2ff5d519d8c0..779d083289b70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java @@ -768,8 +768,16 @@ private CompletableFuture clearAllSnapshotSegments() { try { while (wait(reader.hasMoreEventsAsync(), "has more events")) { final var message = wait(reader.readNextAsync(), "read next"); - if (topic.getName().equals(message.getValue().getTopicName())) { - snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null); + final String topicName; + final String key; + try { + topicName = message.getValue().getTopicName(); + key = message.getKey(); + } finally { + message.release(); + } + if (topic.getName().equals(topicName)) { + snapshotSegmentsWriter.getFuture().get().write(key, null); } } future.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java index 7608a393cc980..40adec7488420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java @@ -61,12 +61,16 @@ public T readLatest(String topic) throws Exception { final var reader = getReader(topic); while (wait(reader.hasMoreEventsAsync(), "has more events")) { final var msg = wait(reader.readNextAsync(), "read message"); - if (msg.getKey() != null) { - if (msg.getValue() != null) { - snapshots.put(msg.getKey(), msg.getValue()); - } else { - snapshots.remove(msg.getKey()); + try { + if (msg.getKey() != null) { + if (msg.getValue() != null) { + snapshots.put(msg.getKey(), msg.getValue()); + } else { + snapshots.remove(msg.getKey()); + } } + } finally { + msg.release(); } } return snapshots.get(topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java index d9ba825f02e93..b9f4c4f632cc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java @@ -348,6 +348,7 @@ private void verifySnapshotSegmentsSize(String topic, int size) throws Exception .createReader(TopicName.get(topic)).get(); int segmentCount = 0; while (reader.hasMoreEvents()) { + @Cleanup("release") Message message = reader.readNextAsync() .get(5, TimeUnit.SECONDS); if (topic.equals(message.getValue().getTopicName())) { @@ -364,6 +365,7 @@ private void verifySnapshotSegmentsIndexSize(String topic, int size) throws Exce .createReader(TopicName.get(topic)).get(); int indexCount = 0; while (reader.hasMoreEvents()) { + @Cleanup("release") Message message = reader.readNextAsync() .get(5, TimeUnit.SECONDS); if (topic.equals(message.getValue().getTopicName())) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 14cc813a17ddd..ccf99936439dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -715,7 +715,9 @@ public void testTransactionBufferIndexSystemTopic() throws Exception { indexesWriter.write(SNAPSHOT_INDEX, transactionBufferTransactionBufferSnapshotIndexes); assertTrue(indexesReader.hasMoreEvents()); - transactionBufferTransactionBufferSnapshotIndexes = indexesReader.readNext().getValue(); + @Cleanup("release") + Message message = indexesReader.readNext(); + transactionBufferTransactionBufferSnapshotIndexes = message.getValue(); assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(), SNAPSHOT_INDEX); assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(), 5); assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());