diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4933aee974d08..a7f13870ae055 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1583,67 +1583,69 @@ protected void handleProducer(final CommandProducer cmdProducer) { }); schemaVersionFuture.thenAccept(schemaVersion -> { - topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> { - CompletionStage createInitSubFuture; - if (!Strings.isNullOrEmpty(initialSubscriptionName) - && topic.isPersistent() - && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { - createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) - .thenCompose(isAllowAutoSubscriptionCreation -> { - if (!isAllowAutoSubscriptionCreation) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException( - "Could not create the initial subscription due to" - + " the auto subscription creation is not allowed.")); - } - return topic.createSubscription(initialSubscriptionName, - InitialPosition.Earliest, false, null); - }); - } else { - createInitSubFuture = CompletableFuture.completedFuture(null); - } + topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled) + .thenCompose(future -> topic.takeFirstSnapshotIfNeed(isTxnEnabled)) + .thenAccept(future -> { + CompletionStage createInitSubFuture; + if (!Strings.isNullOrEmpty(initialSubscriptionName) + && topic.isPersistent() + && !topic.getSubscriptions().containsKey(initialSubscriptionName)) { + createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName) + .thenCompose(isAllowAutoSubscriptionCreation -> { + if (!isAllowAutoSubscriptionCreation) { + return CompletableFuture.failedFuture( + new BrokerServiceException.NotAllowedException( + "Could not create the initial subscription due to" + + " the auto subscription creation is not allowed.")); + } + return topic.createSubscription(initialSubscriptionName, + InitialPosition.Earliest, false, null); + }); + } else { + createInitSubFuture = CompletableFuture.completedFuture(null); + } - createInitSubFuture.whenComplete((sub, ex) -> { - if (ex != null) { - final Throwable rc = FutureUtil.unwrapCompletionException(ex); - if (rc instanceof BrokerServiceException.NotAllowedException) { - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(rc)) { - commandSender.sendErrorResponse(requestId, - ServerError.NotAllowedError, rc.getMessage()); + createInitSubFuture.whenComplete((sub, ex) -> { + if (ex != null) { + final Throwable rc = FutureUtil.unwrapCompletionException(ex); + if (rc instanceof BrokerServiceException.NotAllowedException) { + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, rc.getMessage(), initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(rc)) { + commandSender.sendErrorResponse(requestId, + ServerError.NotAllowedError, rc.getMessage()); + } + producers.remove(producerId, producerFuture); + return; + } + String msg = + "Failed to create the initial subscription: " + ex.getCause().getMessage(); + log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", + remoteAddress, msg, initialSubscriptionName, topicName); + if (producerFuture.completeExceptionally(ex)) { + commandSender.sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode(ex), msg); + } + producers.remove(producerId, producerFuture); + return; } - producers.remove(producerId, producerFuture); - return; - } - String msg = - "Failed to create the initial subscription: " + ex.getCause().getMessage(); - log.warn("[{}] {} initialSubscriptionName: {}, topic: {}", - remoteAddress, msg, initialSubscriptionName, topicName); - if (producerFuture.completeExceptionally(ex)) { + + buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, + metadata, schemaVersion, epoch, userProvidedProducerName, topicName, + producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); + }); + }).exceptionally(exception -> { + Throwable cause = exception.getCause(); + log.error("producerId {}, requestId {} : TransactionBuffer recover failed", + producerId, requestId, exception); + if (producerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, - BrokerServiceException.getClientErrorCode(ex), msg); + ServiceUnitNotReadyException.getClientErrorCode(cause), + cause.getMessage()); } producers.remove(producerId, producerFuture); - return; - } - - buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted, - metadata, schemaVersion, epoch, userProvidedProducerName, topicName, - producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture); - }); - }).exceptionally(exception -> { - Throwable cause = exception.getCause(); - log.error("producerId {}, requestId {} : TransactionBuffer recover failed", - producerId, requestId, exception); - if (producerFuture.completeExceptionally(exception)) { - commandSender.sendErrorResponse(requestId, - ServiceUnitNotReadyException.getClientErrorCode(cause), - cause.getMessage()); - } - producers.remove(producerId, producerFuture); - return null; - }); + return null; + }); }); }); return backlogQuotaCheckFuture; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 50a28c7979277..661335685accf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -153,6 +153,13 @@ default void setEntryTimestamp(long entryTimestamp) { */ CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled); + /** + * Take snapshot if needed. + * @param isTxnEnabled isTxnEnabled + * @return a future represents the result of take snapshot operation. + */ + CompletableFuture takeFirstSnapshotIfNeed(boolean isTxnEnabled); + /** * record add-latency. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 3801ac7f3ee82..bb58d1d8825c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -261,6 +261,11 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture subscribe(SubscriptionOption option) { return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 07deb1168072a..1900faf1187d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -845,6 +845,11 @@ public CompletableFuture checkIfTransactionBufferRecoverCompletely(boolean return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean isTxnEnabled) { + return getTransactionBuffer().takeFirstSnapshotIfNeed(isTxnEnabled); + } + @Override protected CompletableFuture incrementTopicEpoch(Optional currentEpoch) { long newEpoch = currentEpoch.orElse(-1L) + 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index b379c4d1db10c..07e3aaf4f5969 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -194,7 +194,12 @@ public interface TransactionBuffer { */ CompletableFuture checkIfTBRecoverCompletely(boolean isTxn); - + /** + * Take snapshot if needed. + * @param enableTxn + * @return a future represents the result of take snapshot operation. + */ + CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn); long getOngoingTxnCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index ae755f0715ee2..338ac24ad3d98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -415,6 +415,11 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + return CompletableFuture.completedFuture(null); + } + @Override public long getOngoingTxnCount() { return this.buffers.values().stream() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index b4662e5fa83ed..a181cea2fa210 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -219,20 +219,11 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) } else { CompletableFuture completableFuture = new CompletableFuture<>(); transactionBufferFuture.thenRun(() -> { - if (checkIfNoSnapshot()) { - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - } - completableFuture.complete(null); - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - } else { + if (checkIfNoSnapshot() || checkIfReady()) { completableFuture.complete(null); + } else { + completableFuture.completeExceptionally(new BrokerServiceException + .ServiceUnitNotReadyException("TransactionBuffer recover failed")); } }).exceptionally(exception -> { log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause()); @@ -243,6 +234,19 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxnEnabled) } } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + CompletableFuture completableFuture = new CompletableFuture<>(); + if (enableTxn && checkIfNoSnapshot()) { + this.snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition) + .thenRun(() -> changeToReadyStateFromNoSnapshot()) + .thenAccept(__ -> completableFuture.complete(null)); + } else { + completableFuture.complete(null); + } + return completableFuture; + } + @Override public long getOngoingTxnCount() { return this.ongoingTxns.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d0efc47c49544..3a2ac632f4ba1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -136,6 +136,11 @@ public CompletableFuture checkIfTBRecoverCompletely(boolean isTxn) { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture takeFirstSnapshotIfNeed(boolean enableTxn) { + return CompletableFuture.completedFuture(null); + } + @Override public long getOngoingTxnCount() { return 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index dea79f391e39a..378bc09141f52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -343,6 +343,48 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception { assertGetLastMessageId(consumer, expectedLastMessageID1); } + @Test + public void testNormalProductionNoSnapshot() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/testNormalProductionNoSnapshot"; + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + PersistentTopic topicRef = null; + for (int i = 0; i < pulsarServiceList.size(); i++) { + try { + topicRef = (PersistentTopic) pulsarServiceList.get(i) + .getBrokerService().getTopic(topic, true).get().get(); + break; + } catch (Exception e) { + } + } + TopicTransactionBuffer transactionBuffer = (TopicTransactionBuffer) topicRef.getTransactionBuffer(); + + // check the transaction buffer state is NoSnapshot + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + + // send 5 original messages. + for (int i = 0; i < 5; i++) { + producer.newMessage().send(); + } + + // check the transaction buffer state is NoSnapshot + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + + // create consumer to get last message id, trigger the snapshot + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribe(); + consumer.getLastMessageIds(); + + // check the transaction buffer state is Ready + Assert.assertEquals(transactionBuffer.getState(), TopicTransactionBufferState.State.NoSnapshot); + } + /** * produce 3 messages and then trigger a ledger switch, * then create a transaction and send a transactional message.