diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java new file mode 100644 index 0000000000000..cd8671b0e6289 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceStability.Evolving +public class ManagedLedgerImplUtils { + + /** + * Reverse find last valid position one-entry by one-entry. + */ + public static CompletableFuture asyncGetLastValidPosition(final ManagedLedgerImpl ledger, + final Predicate predicate, + final PositionImpl startPosition) { + CompletableFuture future = new CompletableFuture<>(); + if (!ledger.isValidPosition(startPosition)) { + future.complete(startPosition); + } else { + internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future); + } + return future; + } + + private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger, + final Predicate predicate, + final PositionImpl position, + final CompletableFuture future) { + ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + final Position position = entry.getPosition(); + try { + if (predicate.test(entry)) { + future.complete(position); + return; + } + PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position); + if (!ledger.isValidPosition(previousPosition)) { + future.complete(previousPosition); + } else { + internalAsyncReverseFindPositionOneByOne(ledger, predicate, + ledger.getPreviousPosition((PositionImpl) position), future); + } + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + entry.release(); + } + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java new file mode 100644 index 0000000000000..f13d23c05296f --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtilsTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import java.nio.charset.StandardCharsets; +import java.util.function.Predicate; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.testng.annotations.Test; + +@Slf4j +public class ManagedLedgerImplUtilsTest extends MockedBookKeeperTestCase { + + @Test + public void testGetLastValidPosition() throws Exception { + final int maxEntriesPerLedger = 5; + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger); + ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig); + + String matchEntry = "match-entry"; + String noMatchEntry = "nomatch-entry"; + Predicate predicate = entry -> { + String entryValue = entry.getDataBuffer().toString(UTF_8); + return matchEntry.equals(entryValue); + }; + + // New ledger will return the last position, regardless of whether the conditions are met or not. + Position position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, + predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get(); + assertEquals(ledger.getLastConfirmedEntry(), position); + + for (int i = 0; i < maxEntriesPerLedger - 1; i++) { + ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8)); + } + Position lastMatchPosition = ledger.addEntry(matchEntry.getBytes(StandardCharsets.UTF_8)); + for (int i = 0; i < maxEntriesPerLedger; i++) { + ledger.addEntry(noMatchEntry.getBytes(StandardCharsets.UTF_8)); + } + + // Returns last position of entry is "match-entry" + position = ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, + predicate, (PositionImpl) ledger.getLastConfirmedEntry()).get(); + assertEquals(position, lastMatchPosition); + + ledger.close(); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index a60f1d805ceb6..5ccdbfbe715c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2174,29 +2174,31 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - topic.checkIfTransactionBufferRecoverCompletely(true).thenRun(() -> { - Position lastPosition = ((PersistentTopic) topic).getMaxReadPosition(); - int partitionIndex = TopicName.getPartitionIndex(topic.getName()); - - Position markDeletePosition = null; - if (consumer.getSubscription() instanceof PersistentSubscription) { - markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() - .getMarkDeletedPosition(); - } - - getLargestBatchIndexWhenPossible( - topic, - (PositionImpl) lastPosition, - (PositionImpl) markDeletePosition, - partitionIndex, - requestId, - consumer.getSubscription().getName(), - consumer.readCompacted()); - }).exceptionally(e -> { - writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), - ServerError.UnknownError, "Failed to recover Transaction Buffer.")); - return null; - }); + topic.checkIfTransactionBufferRecoverCompletely(true) + .thenCompose(__ -> topic.getLastDispatchablePosition()) + .thenApply(lastPosition -> { + int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + + getLargestBatchIndexWhenPossible( + topic, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, + partitionIndex, + requestId, + consumer.getSubscription().getName(), + consumer.readCompacted()); + return null; + }).exceptionally(e -> { + writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), + ServerError.UnknownError, "Failed to recover Transaction Buffer.")); + return null; + }); } else { writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index a296052a41191..37696d7a7c53c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -275,6 +275,13 @@ CompletableFuture asyncGetStats(boolean getPreciseBack Position getLastPosition(); + /** + * Get the last message position that can be dispatch. + */ + default CompletableFuture getLastDispatchablePosition() { + throw new UnsupportedOperationException("getLastDispatchablePosition is not supported by default"); + } + CompletableFuture getLastMessageId(); /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 155b67778820b..95a2b64908a73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils; import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -174,6 +175,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; @@ -3634,6 +3636,22 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } + @Override + public CompletableFuture getLastDispatchablePosition() { + PositionImpl maxReadPosition = getMaxReadPosition(); + // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions. + // so return `maxRedPosition` directly. + if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) { + return CompletableFuture.completedFuture(maxReadPosition); + } else { + return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> { + MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer()); + // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer + return !Markers.isServerOnlyMarker(md); + }, maxReadPosition); + } + } + @Override public CompletableFuture getLastMessageId() { CompletableFuture completableFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 8aeb902211db2..25b09f965498d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; @@ -167,6 +168,82 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception { "messages don't match."); } + /** + * Tests replicated subscriptions across two regions and can read successful. + */ + @Test + public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage"); + String topicName = "persistent://" + namespace + "/mytopic"; + String subscriptionName = "cluster-subscription"; + // this setting can be used to manually run the test with subscription replication disabled + // it shows that subscription replication has no impact in behavior for this test case + boolean replicateSubscriptionState = true; + + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + // create subscription in r1 + createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState); + + @Cleanup + PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + // create subscription in r2 + createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState); + + Set sentMessages = new LinkedHashSet<>(); + + // send messages in r1 + @Cleanup + Producer producer = client1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + int numMessages = 6; + for (int i = 0; i < numMessages; i++) { + String body = "message" + i; + producer.send(body.getBytes(StandardCharsets.UTF_8)); + sentMessages.add(body); + } + producer.close(); + + + // consume 3 messages in r1 + Set receivedMessages = new LinkedHashSet<>(); + try (Consumer consumer1 = client1.newConsumer() + .topic(topicName) + .subscriptionName(subscriptionName) + .replicateSubscriptionState(replicateSubscriptionState) + .subscribe()) { + readMessages(consumer1, receivedMessages, 3, false); + } + + // wait for subscription to be replicated + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + // create a reader in r2 + Reader reader = client2.newReader().topic(topicName) + .subscriptionName("new-sub") + .startMessageId(MessageId.earliest) + .create(); + int readNum = 0; + while (reader.hasMessageAvailable()) { + Message message = reader.readNext(10, TimeUnit.SECONDS); + assertNotNull(message); + log.info("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId()); + readNum++; + } + assertEquals(readNum, numMessages); + } + @Test public void testReplicatedSubscribeAndSwitchToStandbyCluster() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns_"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java new file mode 100644 index 0000000000000..93a22a851f160 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionWithTransactionTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Tests replicated subscriptions with transaction (PIP-33) + */ +@Test(groups = "broker") +public class ReplicatorSubscriptionWithTransactionTest extends ReplicatorSubscriptionTest { + + @Override + @BeforeClass(timeOut = 300000) + public void setup() throws Exception { + config1.setTransactionCoordinatorEnabled(true); + config2.setTransactionCoordinatorEnabled(true); + config3.setTransactionCoordinatorEnabled(true); + config4.setTransactionCoordinatorEnabled(true); + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @DataProvider(name = "isTopicPolicyEnabled") + private Object[][] isTopicPolicyEnabled() { + // Todo: fix replication can not be enabled at topic level. + return new Object[][] { { Boolean.FALSE } }; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index fad785cc882ff..b0903b00be380 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -280,9 +280,9 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { for (int i = 0; i < 3; i++) { expectedLastMessageID = (MessageIdImpl) producer.newMessage().send(); } - assertMessageId(consumer, expectedLastMessageID, 0); + assertMessageId(consumer, expectedLastMessageID); // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages. - // |1:0|1:1|1:2|txn1->1:3|1:4|txn2->1:5|1:6|. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|. Transaction txn1 = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.HOURS) .build() @@ -292,18 +292,24 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { .build() .get(); producer.newMessage(txn1).send(); + // expectedLastMessageID1 == 1:4 MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send(); producer.newMessage(txn2).send(); + // expectedLastMessageID2 == 1:6 MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send(); + // 2.2.1 Last message ID will not change when txn1 and txn2 do not end. - assertMessageId(consumer, expectedLastMessageID, 0); + assertMessageId(consumer, expectedLastMessageID); + // 2.2.2 Last message ID will update to 1:4 when txn1 committed. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7| txn1.commit().get(5, TimeUnit.SECONDS); - assertMessageId(consumer, expectedLastMessageID1, 0); + assertMessageId(consumer, expectedLastMessageID1); + // 2.2.3 Last message ID will update to 1:6 when txn2 aborted. + // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8| txn2.abort().get(5, TimeUnit.SECONDS); - // Todo: We can not ignore the marker's position in this fix. - assertMessageId(consumer, expectedLastMessageID2, 2); + assertMessageId(consumer, expectedLastMessageID2); } /** @@ -362,9 +368,9 @@ private void triggerLedgerSwitch(String topicName) throws Exception{ }); } - private void assertMessageId(Consumer consumer, MessageIdImpl expected, int entryOffset) throws Exception { + private void assertMessageId(Consumer consumer, MessageIdImpl expected) throws Exception { TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0); - assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset); + assertEquals(expected.getEntryId(), actual.getEntryId()); assertEquals(expected.getLedgerId(), actual.getLedgerId()); }