diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b9a8e74b9a48d..7532bee8c12e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1758,16 +1758,14 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { // Check create persistent topic timeout. log.warn("{} future is already completed with failure {}, closing the" + " topic", topic, FutureUtil.getException(topicFuture)); - persistentTopic.getTransactionBuffer() - .closeAsync() - .exceptionally(t -> { - log.error("[{}] Close transactionBuffer failed", topic, t); - return null; - }); - persistentTopic.stopReplProducers() - .whenCompleteAsync((v, exception) -> { - topics.remove(topic, topicFuture); - }, executor()); + executor().submit(() -> { + persistentTopic.close().whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("[{}] Get an error when closing topic.", + topic, ex); + } + }); + }); } else { addTopicToStatsMaps(topicName, persistentTopic); topicFuture.complete(Optional.of(persistentTopic)); @@ -1776,16 +1774,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { .exceptionally((ex) -> { log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); - persistentTopic.getTransactionBuffer() - .closeAsync() - .exceptionally(t -> { - log.error("[{}] Close transactionBuffer failed", topic, t); - return null; - }); - persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { - topics.remove(topic, topicFuture); - topicFuture.completeExceptionally(ex); - }, executor()); + executor().submit(() -> { + persistentTopic.close().whenComplete((ignore, closeEx) -> { + if (closeEx != null) { + log.warn("[{}] Get an error when closing topic.", + topic, closeEx); + } + topicFuture.completeExceptionally(ex); + }); + }); return null; }); } catch (PulsarServerException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java new file mode 100644 index 0000000000000..7cd9da7574dbb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicPoliciesService; +import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.compaction.CompactionServiceFactory; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class OrphanPersistentTopicTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testNoOrphanTopicAfterCreateTimeout() throws Exception { + // Make the topic loading timeout faster. + int topicLoadTimeoutSeconds = 2; + long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); + pulsar.getConfig().setTopicLoadTimeoutSeconds(2); + + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + TopicName.get(tpName).getPersistenceNamingEncoding(); + + // Make topic load timeout 5 times. + AtomicInteger timeoutCounter = new AtomicInteger(); + for (int i = 0; i < 5; i++) { + mockZooKeeper.delay(topicLoadTimeoutSeconds * 2 * 1000, (op, path) -> { + if (mlPath.equals(path)) { + log.info("Topic load timeout: " + timeoutCounter.incrementAndGet()); + return true; + } + return false; + }); + } + + // Load topic. + CompletableFuture> consumer = pulsarClient.newConsumer() + .topic(tpName) + .subscriptionName("my-sub") + .subscribeAsync(); + + // After create timeout 5 times, the topic will be created successful. + Awaitility.await().ignoreExceptions().atMost(40, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture> future = pulsar.getBrokerService().getTopic(tpName, false); + assertTrue(future.isDone()); + Optional optional = future.get(); + assertTrue(optional.isPresent()); + }); + + // Assert only one PersistentTopic was not closed. + TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); + Map>> listeners = + WhiteboxImpl.getInternalState(topicPoliciesService, "listeners"); + assertEquals(listeners.get(TopicName.get(tpName)).size(), 1); + + // cleanup. + consumer.join().close(); + admin.topics().delete(tpName, false); + pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); + } + + @Test + public void testNoOrphanTopicIfInitFailed() throws Exception { + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(tpName); + + // Load topic. + Consumer consumer = pulsarClient.newConsumer() + .topic(tpName) + .subscriptionName("my-sub") + .subscribe(); + + // Make the method `PersitentTopic.initialize` fail. + Field fieldCompactionServiceFactory = PulsarService.class.getDeclaredField("compactionServiceFactory"); + fieldCompactionServiceFactory.setAccessible(true); + CompactionServiceFactory compactionServiceFactory = + (CompactionServiceFactory) fieldCompactionServiceFactory.get(pulsar); + fieldCompactionServiceFactory.set(pulsar, null); + admin.topics().unload(tpName); + + // Wait for failed to create topic for several times. + Thread.sleep(5 * 1000); + + // Remove the injected error, the topic will be created successful. + fieldCompactionServiceFactory.set(pulsar, compactionServiceFactory); + // We do not know the next time of consumer reconnection, so wait for 2 minutes to avoid flaky. It will be + // very fast in normal. + Awaitility.await().ignoreExceptions().atMost(120, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture> future = pulsar.getBrokerService().getTopic(tpName, false); + assertTrue(future.isDone()); + Optional optional = future.get(); + assertTrue(optional.isPresent()); + }); + + // Assert only one PersistentTopic was not closed. + TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService(); + Map>> listeners = + WhiteboxImpl.getInternalState(topicPoliciesService, "listeners"); + assertEquals(listeners.get(TopicName.get(tpName)).size(), 1); + + // cleanup. + consumer.close(); + admin.topics().delete(tpName, false); + } +} diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index 0c0f7ec9ed1d4..f32036e53f001 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -1114,7 +1114,7 @@ Optional programmedFailure(Op op, String path) { Optional failure = failures.stream().filter(f -> f.predicate.test(op, path)).findFirst(); if (failure.isPresent()) { failures.remove(failure.get()); - return Optional.of(failure.get().failReturnCode); + return Optional.ofNullable(failure.get().failReturnCode); } else { return Optional.empty(); } @@ -1131,6 +1131,18 @@ public void failConditional(KeeperException.Code rc, BiPredicate pre failures.add(new Failure(rc, predicate)); } + public void delay(long millis, BiPredicate predicate) { + failures.add(new Failure(null, (op, s) -> { + if (predicate.test(op, s)) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) {} + return true; + } + return false; + })); + } + public void setAlwaysFail(KeeperException.Code rc) { this.alwaysFail.set(rc); }