From b7c637c7a06d67d15bf52c4b2244f1f32bffa52d Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 Jan 2024 17:52:57 +0800 Subject: [PATCH 01/11] add code. --- .../broker/transaction/TransactionTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index eba7f1e8c73c3..b58768a74b9ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; @@ -1908,4 +1909,49 @@ public void testReadCommittedWithCompaction() throws Exception{ Assert.assertEquals(result, List.of("V4", "V5", "V6")); } + public void testReaderStuckWithMarkerMsg() throws Exception{ + final String namespace = "tnx/ns-prechecks"; + final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + + admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024); + + @Cleanup + Producer producer = this.pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.newMessage().key("K1").value("V1").send(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn).key("K2").value("V2").send(); + producer.newMessage(txn).key("K3").value("V3").send(); + txn.commit().get(); + + producer.newMessage().key("K1").value("V4").send(); + + Transaction txn2 = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn2).key("K2").value("V5").send(); + producer.newMessage(txn2).key("K3").value("V6").send(); + txn2.commit().get(); + + @Cleanup + Reader reader = this.pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .startMessageId(MessageId.earliest) + .readCompacted(true) + .create(); + List result = new ArrayList<>(); + while (reader.hasMessageAvailable()) { + Message receive = reader.readNext(2, TimeUnit.SECONDS); + // we pass the check of hasMessageAvailable, but the readNext is still blocked. + assertNotEquals(receive, null); + result.add(receive.getValue()); + } + Assert.assertEquals(result, List.of("V4", "V5", "V6")); + } } From ab7b7f09c0ac88fc1c2d1d84151cab27987c6725 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 Jan 2024 18:19:27 +0800 Subject: [PATCH 02/11] add code. --- .../pulsar/broker/service/ServerCnx.java | 72 ++++++++++++++----- .../broker/transaction/TransactionTest.java | 1 - 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bd4917da3b119..0ed238223c5c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -49,6 +49,7 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; @@ -68,6 +69,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pulsar.broker.PulsarService; @@ -2190,6 +2192,58 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) } } + private boolean isValidEntry(Entry entry) { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + return !metadata.hasMarkerType(); + } + + /** + * Get the largest batch index of the last valid entry. + * some entry need to be filtered, such as marker message. + * @param entryFuture + * @param lastPosition + * @param ml + */ + private void readLastValidEntry(CompletableFuture entryFuture, PositionImpl lastPosition, + ManagedLedgerImpl ml) { + ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + if (isValidEntry(entry)) { + entryFuture.complete(entry); + } else { + // check the previous entry + if (lastPosition.getEntryId() > 0) { + readLastValidEntry(entryFuture, PositionImpl.get(lastPosition.getLedgerId(), + lastPosition.getEntryId() - 1), ml); + } else { + // find out the previous ledger + NavigableMap ledgersMap = + ml.getLedgersInfo(); + Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); + if (previousLedgerId != null) { + readLastValidEntry(entryFuture, PositionImpl.get(previousLedgerId, + ledgersMap.get(previousLedgerId).getEntries() - 1), ml); + } else { + entryFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + } + } + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + entryFuture.completeExceptionally(exception); + } + + @Override + public String toString() { + return String.format("ServerCnx [%s] get largest batch index when possible", + ServerCnx.this.ctx.channel()); + } + }, null); + } + private void getLargestBatchIndexWhenPossible( Topic topic, PositionImpl lastPosition, @@ -2222,23 +2276,7 @@ private void getLargestBatchIndexWhenPossible( // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture entryFuture = new CompletableFuture<>(); - ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { - @Override - public void readEntryComplete(Entry entry, Object ctx) { - entryFuture.complete(entry); - } - - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - entryFuture.completeExceptionally(exception); - } - - @Override - public String toString() { - return String.format("ServerCnx [%s] get largest batch index when possible", - ServerCnx.this.ctx.channel()); - } - }, null); + readLastValidEntry(entryFuture, lastPosition, ml); CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index b58768a74b9ce..82e4cba9c2bfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1952,6 +1952,5 @@ public void testReaderStuckWithMarkerMsg() throws Exception{ assertNotEquals(receive, null); result.add(receive.getValue()); } - Assert.assertEquals(result, List.of("V4", "V5", "V6")); } } From 11c766fcb749199c4c2fde8a423ab428af93b270 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 Jan 2024 18:27:53 +0800 Subject: [PATCH 03/11] add code. --- .../pulsar/broker/service/ServerCnx.java | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0ed238223c5c6..b4a153a6762ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2198,36 +2198,43 @@ private boolean isValidEntry(Entry entry) { } /** - * Get the largest batch index of the last valid entry. + * Get the metadata of the largest batch index of the last valid entry. * some entry need to be filtered, such as marker message. * @param entryFuture * @param lastPosition * @param ml */ - private void readLastValidEntry(CompletableFuture entryFuture, PositionImpl lastPosition, + private void readLastValidEntry(CompletableFuture entryFuture, PositionImpl lastPosition, ManagedLedgerImpl ml) { ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { - if (isValidEntry(entry)) { - entryFuture.complete(entry); - } else { - // check the previous entry - if (lastPosition.getEntryId() > 0) { - readLastValidEntry(entryFuture, PositionImpl.get(lastPosition.getLedgerId(), - lastPosition.getEntryId() - 1), ml); + try { + if (isValidEntry(entry)) { + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + entryFuture.complete(metadata); } else { - // find out the previous ledger - NavigableMap ledgersMap = - ml.getLedgersInfo(); - Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); - if (previousLedgerId != null) { - readLastValidEntry(entryFuture, PositionImpl.get(previousLedgerId, - ledgersMap.get(previousLedgerId).getEntries() - 1), ml); + // check the previous entry + if (lastPosition.getEntryId() > 0) { + readLastValidEntry(entryFuture, PositionImpl.get(lastPosition.getLedgerId(), + lastPosition.getEntryId() - 1), ml); } else { - entryFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + // find out the previous ledger + NavigableMap ledgersMap = + ml.getLedgersInfo(); + Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); + if (previousLedgerId != null) { + readLastValidEntry(entryFuture, PositionImpl.get(previousLedgerId, + ledgersMap.get(previousLedgerId).getEntries() - 1), ml); + } else { + entryFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + } } } + } finally { + if (entry != null) { + entry.release(); + } } } @@ -2275,13 +2282,11 @@ private void getLargestBatchIndexWhenPossible( } // For a valid position, we read the entry out and parse the batch size from its metadata. - CompletableFuture entryFuture = new CompletableFuture<>(); - readLastValidEntry(entryFuture, lastPosition, ml); + CompletableFuture entryMetaDataFuture = new CompletableFuture<>(); + readLastValidEntry(entryMetaDataFuture, lastPosition, ml); - CompletableFuture batchSizeFuture = entryFuture.thenApply(entry -> { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + CompletableFuture batchSizeFuture = entryMetaDataFuture.thenApply(metadata -> { int batchSize = metadata.getNumMessagesInBatch(); - entry.release(); return metadata.hasNumMessagesInBatch() ? batchSize : -1; }); From 12e00af0c4ad1cc195f26a670a521aa8ec07e1b5 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 Jan 2024 18:35:57 +0800 Subject: [PATCH 04/11] add code. --- .../pulsar/broker/service/ServerCnx.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b4a153a6762ab..5816c180a42af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2200,23 +2200,24 @@ private boolean isValidEntry(Entry entry) { /** * Get the metadata of the largest batch index of the last valid entry. * some entry need to be filtered, such as marker message. - * @param entryFuture + * @param entryMetaDataFuture * @param lastPosition * @param ml */ - private void readLastValidEntry(CompletableFuture entryFuture, PositionImpl lastPosition, - ManagedLedgerImpl ml) { + private void readLastValidEntryMetaData(CompletableFuture entryMetaDataFuture, + PositionImpl lastPosition, ManagedLedgerImpl ml) { ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { try { if (isValidEntry(entry)) { + // parse error, need to be fixed. MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - entryFuture.complete(metadata); + entryMetaDataFuture.complete(metadata); } else { // check the previous entry if (lastPosition.getEntryId() > 0) { - readLastValidEntry(entryFuture, PositionImpl.get(lastPosition.getLedgerId(), + readLastValidEntryMetaData(entryMetaDataFuture, PositionImpl.get(lastPosition.getLedgerId(), lastPosition.getEntryId() - 1), ml); } else { // find out the previous ledger @@ -2224,10 +2225,10 @@ public void readEntryComplete(Entry entry, Object ctx) { ml.getLedgersInfo(); Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); if (previousLedgerId != null) { - readLastValidEntry(entryFuture, PositionImpl.get(previousLedgerId, + readLastValidEntryMetaData(entryMetaDataFuture, PositionImpl.get(previousLedgerId, ledgersMap.get(previousLedgerId).getEntries() - 1), ml); } else { - entryFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + entryMetaDataFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); } } } @@ -2240,7 +2241,7 @@ public void readEntryComplete(Entry entry, Object ctx) { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - entryFuture.completeExceptionally(exception); + entryMetaDataFuture.completeExceptionally(exception); } @Override @@ -2283,7 +2284,7 @@ private void getLargestBatchIndexWhenPossible( // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture entryMetaDataFuture = new CompletableFuture<>(); - readLastValidEntry(entryMetaDataFuture, lastPosition, ml); + readLastValidEntryMetaData(entryMetaDataFuture, lastPosition, ml); CompletableFuture batchSizeFuture = entryMetaDataFuture.thenApply(metadata -> { int batchSize = metadata.getNumMessagesInBatch(); From 7357c32a684005692fbe344a9b4521cc87a03e9e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 Jan 2024 18:38:46 +0800 Subject: [PATCH 05/11] add code. --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5816c180a42af..57d987c430d5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2192,8 +2192,7 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) } } - private boolean isValidEntry(Entry entry) { - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + private boolean isValidEntry(MessageMetadata metadata) { return !metadata.hasMarkerType(); } @@ -2210,9 +2209,8 @@ private void readLastValidEntryMetaData(CompletableFuture entry @Override public void readEntryComplete(Entry entry, Object ctx) { try { - if (isValidEntry(entry)) { - // parse error, need to be fixed. - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); + if (isValidEntry(metadata)) { entryMetaDataFuture.complete(metadata); } else { // check the previous entry From 312ef5806d7d84406642b2c9c2c0f6b46c58ad17 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 22 Jan 2024 16:49:41 +0800 Subject: [PATCH 06/11] add test code. --- .../broker/transaction/TransactionTest.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 82e4cba9c2bfb..b80f035bb8fe8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1953,4 +1953,64 @@ public void testReaderStuckWithMarkerMsg() throws Exception{ result.add(receive.getValue()); } } + + @Test + public void testReadCommitMarkerStuck() throws Exception{ + final String namespace = "tnx/ns-commit-marker-stuck"; + final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = this.pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn).key("K1").value("V1").send(); + txn.commit().get(); + + @Cleanup + Reader reader = this.pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + List result = new ArrayList<>(); + while (reader.hasMessageAvailable()) { + Message receive = reader.readNext(2, TimeUnit.SECONDS); + assertNotEquals(receive, null); + result.add(receive.getValue()); + } + } + + @Test + public void testReadAbortMarkerStuck() throws Exception{ + final String namespace = "tnx/ns-abort-marker-stuck"; + final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = this.pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn).key("K1").value("V1").send(); + txn.abort().get(); + + @Cleanup + Reader reader = this.pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + List result = new ArrayList<>(); + while (reader.hasMessageAvailable()) { + Message receive = reader.readNext(2, TimeUnit.SECONDS); + assertNotEquals(receive, null); + result.add(receive.getValue()); + } + } } From 19d7ed9721a04a874776725f3631036173240f9e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 22 Jan 2024 16:50:42 +0800 Subject: [PATCH 07/11] tset code. --- .../broker/transaction/TransactionTest.java | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index b80f035bb8fe8..34eb193ec4db6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1909,51 +1909,6 @@ public void testReadCommittedWithCompaction() throws Exception{ Assert.assertEquals(result, List.of("V4", "V5", "V6")); } - public void testReaderStuckWithMarkerMsg() throws Exception{ - final String namespace = "tnx/ns-prechecks"; - final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID(); - admin.namespaces().createNamespace(namespace); - admin.topics().createNonPartitionedTopic(topic); - - admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024); - - @Cleanup - Producer producer = this.pulsarClient.newProducer(Schema.STRING) - .topic(topic) - .create(); - - producer.newMessage().key("K1").value("V1").send(); - - Transaction txn = pulsarClient.newTransaction() - .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); - producer.newMessage(txn).key("K2").value("V2").send(); - producer.newMessage(txn).key("K3").value("V3").send(); - txn.commit().get(); - - producer.newMessage().key("K1").value("V4").send(); - - Transaction txn2 = pulsarClient.newTransaction() - .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); - producer.newMessage(txn2).key("K2").value("V5").send(); - producer.newMessage(txn2).key("K3").value("V6").send(); - txn2.commit().get(); - - @Cleanup - Reader reader = this.pulsarClient.newReader(Schema.STRING) - .topic(topic) - .subscriptionName("sub") - .startMessageId(MessageId.earliest) - .readCompacted(true) - .create(); - List result = new ArrayList<>(); - while (reader.hasMessageAvailable()) { - Message receive = reader.readNext(2, TimeUnit.SECONDS); - // we pass the check of hasMessageAvailable, but the readNext is still blocked. - assertNotEquals(receive, null); - result.add(receive.getValue()); - } - } - @Test public void testReadCommitMarkerStuck() throws Exception{ final String namespace = "tnx/ns-commit-marker-stuck"; From 00f2ceef3876a7240d13819cf9152e88418284e0 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 22 Jan 2024 17:29:34 +0800 Subject: [PATCH 08/11] fix. --- .../pulsar/broker/service/ServerCnx.java | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 57d987c430d5c..0b738d698378f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2197,14 +2197,15 @@ private boolean isValidEntry(MessageMetadata metadata) { } /** - * Get the metadata of the largest batch index of the last valid entry. + * Get the metadata and position of the largest batch index of the last valid entry. * some entry need to be filtered, such as marker message. * @param entryMetaDataFuture * @param lastPosition * @param ml */ - private void readLastValidEntryMetaData(CompletableFuture entryMetaDataFuture, - PositionImpl lastPosition, ManagedLedgerImpl ml) { + private void readLastValidEntry(CompletableFuture entryMetaDataFuture, + CompletableFuture lastPositionFuture, + PositionImpl lastPosition, ManagedLedgerImpl ml) { ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { @@ -2212,21 +2213,23 @@ public void readEntryComplete(Entry entry, Object ctx) { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); if (isValidEntry(metadata)) { entryMetaDataFuture.complete(metadata); + lastPositionFuture.complete(lastPosition); } else { // check the previous entry if (lastPosition.getEntryId() > 0) { - readLastValidEntryMetaData(entryMetaDataFuture, PositionImpl.get(lastPosition.getLedgerId(), - lastPosition.getEntryId() - 1), ml); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, + PositionImpl.get(lastPosition.getLedgerId(), lastPosition.getEntryId() - 1), ml); } else { // find out the previous ledger NavigableMap ledgersMap = ml.getLedgersInfo(); Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); if (previousLedgerId != null) { - readLastValidEntryMetaData(entryMetaDataFuture, PositionImpl.get(previousLedgerId, - ledgersMap.get(previousLedgerId).getEntries() - 1), ml); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, + PositionImpl.get(previousLedgerId, ledgersMap.get(previousLedgerId).getEntries() - 1), ml); } else { entryMetaDataFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + lastPositionFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); } } } @@ -2282,36 +2285,33 @@ private void getLargestBatchIndexWhenPossible( // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture entryMetaDataFuture = new CompletableFuture<>(); - readLastValidEntryMetaData(entryMetaDataFuture, lastPosition, ml); + CompletableFuture lastPositionFuture = new CompletableFuture<>(); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, lastPosition, ml); - CompletableFuture batchSizeFuture = entryMetaDataFuture.thenApply(metadata -> { - int batchSize = metadata.getNumMessagesInBatch(); - return metadata.hasNumMessagesInBatch() ? batchSize : -1; - }); - - batchSizeFuture.whenComplete((batchSize, e) -> { - if (e != null) { - if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { - handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, - markDeletePosition); - } else { - writeAndFlush(Commands.newError( - requestId, ServerError.MetadataError, - "Failed to get batch size for entry " + e.getMessage())); - } + entryMetaDataFuture.thenCombine(lastPositionFuture, (metadata, position) -> { + int largestBatchIndex = -1; + if (metadata.hasNumMessagesInBatch()) { + largestBatchIndex = metadata.getNumMessagesInBatch() - 1; + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, + topic.getName(), subscriptionName, lastPosition, partitionIndex); + } + writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, position.getLedgerId(), + position.getEntryId(), partitionIndex, largestBatchIndex, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + return null; + }).exceptionally(ex1 -> { + if (ex1.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { + handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, + markDeletePosition); } else { - int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1; - - if (log.isDebugEnabled()) { - log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), subscriptionName, lastPosition, partitionIndex); - } - - writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), - lastPosition.getEntryId(), partitionIndex, largestBatchIndex, - markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, - markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); + writeAndFlush(Commands.newError( + requestId, ServerError.MetadataError, + "Failed to get batch size for entry " + ex1.getMessage())); } + return null; }); }); } From 56c5967e79a82d0be91920ff3126e2bf9b790e64 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Mon, 22 Jan 2024 17:54:09 +0800 Subject: [PATCH 09/11] add code. --- .../mledger/ManagedLedgerException.java | 6 +++ .../pulsar/broker/service/ServerCnx.java | 40 ++++++++++++++----- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1fa565d6ec788..3f71c5cdb5f27 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -147,6 +147,12 @@ public NonRecoverableLedgerException(String msg) { } } + public static class NoValidEntryLedgerException extends ManagedLedgerException { + public NoValidEntryLedgerException(String msg) { + super(msg); + } + } + public static class LedgerNotExistException extends NonRecoverableLedgerException { public LedgerNotExistException(String msg) { super(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0b738d698378f..d1e0278627e6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -93,6 +93,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -2192,8 +2193,15 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) } } - private boolean isValidEntry(MessageMetadata metadata) { - return !metadata.hasMarkerType(); + private boolean isValidEntry(MessageMetadata metadata, PersistentTopic persistentTopic) { + if (metadata.hasMarkerType()) { + return false; + } + if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits() && persistentTopic.isTxnAborted( + new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits()), null)) { + return false; + } + return true; } /** @@ -2205,31 +2213,34 @@ private boolean isValidEntry(MessageMetadata metadata) { */ private void readLastValidEntry(CompletableFuture entryMetaDataFuture, CompletableFuture lastPositionFuture, - PositionImpl lastPosition, ManagedLedgerImpl ml) { + PositionImpl lastPosition, ManagedLedgerImpl ml, + PersistentTopic persistentTopic) { ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { try { MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - if (isValidEntry(metadata)) { + if (isValidEntry(metadata, persistentTopic)) { entryMetaDataFuture.complete(metadata); lastPositionFuture.complete(lastPosition); } else { // check the previous entry if (lastPosition.getEntryId() > 0) { - readLastValidEntry(entryMetaDataFuture, lastPositionFuture, - PositionImpl.get(lastPosition.getLedgerId(), lastPosition.getEntryId() - 1), ml); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, PositionImpl.get( + lastPosition.getLedgerId(), lastPosition.getEntryId() - 1), ml, persistentTopic); } else { // find out the previous ledger NavigableMap ledgersMap = ml.getLedgersInfo(); Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); if (previousLedgerId != null) { - readLastValidEntry(entryMetaDataFuture, lastPositionFuture, - PositionImpl.get(previousLedgerId, ledgersMap.get(previousLedgerId).getEntries() - 1), ml); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, PositionImpl.get(previousLedgerId, + ledgersMap.get(previousLedgerId).getEntries() - 1), ml, persistentTopic); } else { - entryMetaDataFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); - lastPositionFuture.completeExceptionally(new ManagedLedgerException("No valid entry found")); + entryMetaDataFuture.completeExceptionally( + new ManagedLedgerException.NoValidEntryLedgerException("No valid entry found")); + lastPositionFuture.completeExceptionally( + new ManagedLedgerException.NoValidEntryLedgerException("No valid entry found")); } } } @@ -2286,7 +2297,7 @@ private void getLargestBatchIndexWhenPossible( // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture entryMetaDataFuture = new CompletableFuture<>(); CompletableFuture lastPositionFuture = new CompletableFuture<>(); - readLastValidEntry(entryMetaDataFuture, lastPositionFuture, lastPosition, ml); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, lastPosition, ml, persistentTopic); entryMetaDataFuture.thenCombine(lastPositionFuture, (metadata, position) -> { int largestBatchIndex = -1; @@ -2306,6 +2317,13 @@ private void getLargestBatchIndexWhenPossible( if (ex1.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) { handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex, markDeletePosition); + } else if (ex1.getCause() instanceof ManagedLedgerException.NoValidEntryLedgerException) { + // return MessageId.earliest to indicate that there is no valid messages to be read + MessageIdImpl messageId = (MessageIdImpl) MessageId.earliest; + writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId.getLedgerId(), + messageId.getEntryId(), partitionIndex, -1, + markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, + markDeletePosition != null ? markDeletePosition.getEntryId() : -1)); } else { writeAndFlush(Commands.newError( requestId, ServerError.MetadataError, From 289ed975c110bbae431dfefa9a3181f213ec7557 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 23 Jan 2024 11:13:32 +0800 Subject: [PATCH 10/11] add code. --- .../org/apache/pulsar/broker/transaction/TransactionTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 87f1754f4748a..c6175d82b24d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1934,6 +1934,7 @@ public void testReadCommitMarkerStuck() throws Exception{ List result = new ArrayList<>(); while (reader.hasMessageAvailable()) { Message receive = reader.readNext(2, TimeUnit.SECONDS); + // we pass the hasMessageAvailable check, but the readNext return null or stuck assertNotEquals(receive, null); result.add(receive.getValue()); } @@ -1964,6 +1965,7 @@ public void testReadAbortMarkerStuck() throws Exception{ List result = new ArrayList<>(); while (reader.hasMessageAvailable()) { Message receive = reader.readNext(2, TimeUnit.SECONDS); + // we pass the hasMessageAvailable check, but the readNext return null or stuck assertNotEquals(receive, null); result.add(receive.getValue()); } From b1a52c26b937c9d2a307b160ed2a749619587846 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Tue, 23 Jan 2024 12:16:56 +0800 Subject: [PATCH 11/11] fix checkstyle. --- .../java/org/apache/pulsar/broker/service/ServerCnx.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d1e0278627e6c..10e3ed9adbb8f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2234,8 +2234,9 @@ public void readEntryComplete(Entry entry, Object ctx) { ml.getLedgersInfo(); Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId()); if (previousLedgerId != null) { - readLastValidEntry(entryMetaDataFuture, lastPositionFuture, PositionImpl.get(previousLedgerId, - ledgersMap.get(previousLedgerId).getEntries() - 1), ml, persistentTopic); + readLastValidEntry(entryMetaDataFuture, lastPositionFuture, + PositionImpl.get(previousLedgerId, ledgersMap. + get(previousLedgerId).getEntries() - 1), ml, persistentTopic); } else { entryMetaDataFuture.completeExceptionally( new ManagedLedgerException.NoValidEntryLedgerException("No valid entry found"));