diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e8253771eded4..6cf21febb54f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -765,31 +765,15 @@ public CompletableFuture> addProducer(Producer producer, return brokerService.checkTopicNsOwnership(getName()) .thenCompose(__ -> incrementTopicEpochIfNeeded(producer, producerQueuedFuture)) - .thenCompose(producerEpoch -> { - lock.writeLock().lock(); - try { - checkTopicFenced(); - if (isMigrated()) { - log.warn("[{}] Attempting to add producer to a migrated topic", topic); - throw new TopicMigratedException("Topic was already migrated"); - } else if (isTerminated()) { - log.warn("[{}] Attempting to add producer to a terminated topic", topic); - throw new TopicTerminatedException("Topic was already terminated"); - } - return internalAddProducer(producer).thenApply(ignore -> { + .thenCompose(producerEpoch -> + internalAddProducer(producer).thenApply(ignore -> { USAGE_COUNT_UPDATER.incrementAndGet(this); if (log.isDebugEnabled()) { log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)); } return producerEpoch; - }); - } catch (BrokerServiceException e) { - return FutureUtil.failedFuture(e); - } finally { - lock.writeLock().unlock(); - } - }); + })); } protected CompletableFuture> incrementTopicEpochIfNeeded(Producer producer, @@ -969,23 +953,40 @@ protected void checkTopicFenced() throws BrokerServiceException { } protected CompletableFuture internalAddProducer(Producer producer) { - if (isSameAddressProducersExceeded(producer)) { - log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); - return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( - "Topic '" + topic + "' reached max same address producers limit")); - } + lock.writeLock().lock(); + try { + checkTopicFenced(); + if (isMigrated()) { + log.warn("[{}] Attempting to add producer to a migrated topic", topic); + throw new TopicMigratedException("Topic was already migrated"); + } else if (isTerminated()) { + log.warn("[{}] Attempting to add producer to a terminated topic", topic); + throw new TopicTerminatedException("Topic was already terminated"); + } - if (log.isDebugEnabled()) { - log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); - } + if (isSameAddressProducersExceeded(producer)) { + log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", + topic); + return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( + "Topic '" + topic + "' reached max same address producers limit")); + } + + if (log.isDebugEnabled()) { + log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); + } - Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); - if (existProducer != null) { - return tryOverwriteOldProducer(existProducer, producer); - } else if (!producer.isRemote()) { - USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); + Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); + if (existProducer != null) { + return tryOverwriteOldProducer(existProducer, producer); + } else if (!producer.isRemote()) { + USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this); + } + return CompletableFuture.completedFuture(null); + } catch (BrokerServiceException e) { + return FutureUtil.failedFuture(e); + } finally { + lock.writeLock().unlock(); } - return CompletableFuture.completedFuture(null); } private CompletableFuture tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java index 1e15bcf12c08d..b3fdc4a5ea208 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java @@ -18,58 +18,163 @@ */ package org.apache.pulsar.broker.service; -import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; import static org.testng.Assert.assertEquals; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.qos.AsyncTokenBucket; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; +import io.netty.channel.embedded.EmbeddedChannel; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.function.Supplier; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.api.proto.ProducerAccessMode; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") -public class AbstractTopicTest { - private AbstractSubscription subscription; - private AbstractTopic topic; +public class AbstractTopicTest extends BrokerTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTopicTest.class); + + @DataProvider(name = "topicAndSubscription") + public Object[][] topicAndSubscriptionProvider() { + Supplier> persistentProvider = + createProvider(TopicDomain.persistent, PersistentSubscription.class); + Supplier> nonPersistentProvider = + createProvider(TopicDomain.non_persistent, NonPersistentSubscription.class); + return new Object[][]{{persistentProvider}, {nonPersistentProvider}}; + } + + private Supplier> createProvider( + TopicDomain topicDomain, Class subscriptionClass) { + return () -> { + String topicName = topicDomain.value() + "://public/default/topic-0"; + try { + admin.topics().createNonPartitionedTopic(topicName); + } catch (PulsarAdminException e) { + throw new RuntimeException("Create Topic failed", e); + } + Optional topicOpt = pulsar.getBrokerService() + .getTopicReference(topicName); + if (topicOpt.isEmpty()) { + throw new RuntimeException("Topic " + topicName + " not found"); + } + Topic topic = spy(topicOpt.get()); + AbstractSubscription subscription = mock(subscriptionClass); + doReturn(Map.of("subscription", subscription)) + .when(topic).getSubscriptions(); + return Pair.of((AbstractTopic) topic, subscription); + }; + } @BeforeMethod - public void beforeMethod() { - BrokerService brokerService = mock(BrokerService.class); - PulsarService pulsarService = mock(PulsarService.class); - ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class); - BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class); - subscription = mock(AbstractSubscription.class); - - when(brokerService.pulsar()).thenReturn(pulsarService); - doReturn(pulsarService).when(brokerService).getPulsar(); - when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); - when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager); - doReturn(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK).when(pulsarService).getMonotonicClock(); - - topic = mock(AbstractTopic.class, withSettings() - .useConstructor("topic", brokerService) - .defaultAnswer(CALLS_REAL_METHODS)); - - final var subscriptions = new ConcurrentHashMap(); - subscriptions.put("subscription", subscription); - when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions); + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.setupDefaultTenantAndNamespace(); } - @Test - public void testGetMsgOutCounter() { + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(dataProvider = "topicAndSubscription") + public void testGetMsgOutCounter(Supplier> provider) { + Pair topicAndSubscription = provider.get(); + AbstractTopic topic = topicAndSubscription.getLeft(); + AbstractSubscription subscription = topicAndSubscription.getRight(); topic.msgOutFromRemovedSubscriptions.add(1L); when(subscription.getMsgOutCounter()).thenReturn(2L); assertEquals(topic.getMsgOutCounter(), 3L); } - @Test - public void testGetBytesOutCounter() { + @Test(dataProvider = "topicAndSubscription") + public void testGetBytesOutCounter(Supplier> provider) { + Pair topicAndSubscription = provider.get(); + AbstractTopic topic = topicAndSubscription.getLeft(); + AbstractSubscription subscription = topicAndSubscription.getRight(); topic.bytesOutFromRemovedSubscriptions.add(1L); when(subscription.getBytesOutCounter()).thenReturn(2L); assertEquals(topic.getBytesOutCounter(), 3L); } + + @Test(dataProvider = "topicAndSubscription") + public void testOverwriteOldProducerAfterTopicClosed( + Supplier> provider) throws InterruptedException { + Pair topicAndSubscription = provider.get(); + AbstractTopic topic = topicAndSubscription.getLeft(); + Producer oldProducer = spy(createProducer(topic)); + ServerCnx oldCnx = spy((ServerCnx) oldProducer.getCnx()); + doReturn(oldCnx).when(oldProducer).getCnx(); + + // Add old producer + topic.addProducer(oldProducer, new CompletableFuture<>()).join(); + + CountDownLatch oldCnxCheckInvokedLatch = new CountDownLatch(1); + CountDownLatch oldCnxCheckStartLatch = new CountDownLatch(1); + doAnswer(invocation -> { + CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture.runAsync(() -> { + try { + oldCnxCheckInvokedLatch.countDown(); + oldCnxCheckStartLatch.await(); + } catch (InterruptedException e) { + future.completeExceptionally(e); + } + future.complete(Optional.of(false)); + }); + return future; + }).when(oldCnx).checkConnectionLiveness(); + + // Add new producer + Producer newProducer = createProducer(topic); + CompletableFuture> producerEpoch = + topic.addProducer(newProducer, new CompletableFuture<>()); + + // Wait until new producer entered `AbstractTopic#tryOverwriteOldProducer` + oldCnxCheckInvokedLatch.await(); + + topic.close(true); + // Run pending tasks to remove old producer from topic. + ((EmbeddedChannel) oldCnx.ctx().channel()).runPendingTasks(); + + // Unblock ServerCnx#checkConnectionLiveness to resume `AbstractTopic#tryOverwriteOldProducer` + oldCnxCheckStartLatch.countDown(); + + // As topic is fenced, adding new producer should fail. + try { + producerEpoch.join(); + fail("TopicFencedException expected"); + } catch (CompletionException e) { + LOG.error("Failed to add producer", e); + assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class); + } + } + + private Producer createProducer(Topic topic) { + ServerCnx serverCnx = new ServerCnx(pulsar); + new EmbeddedChannel(serverCnx); + assertNotNull(serverCnx.ctx()); + return new Producer(topic, serverCnx, 1, "prod-name", + "app-0", false, null, SchemaVersion.Latest, 0, false, + ProducerAccessMode.Shared, Optional.empty(), true); + } }