Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -765,31 +765,15 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
return brokerService.checkTopicNsOwnership(getName())
.thenCompose(__ ->
incrementTopicEpochIfNeeded(producer, producerQueuedFuture))
.thenCompose(producerEpoch -> {
lock.writeLock().lock();
try {
checkTopicFenced();
if (isMigrated()) {
log.warn("[{}] Attempting to add producer to a migrated topic", topic);
throw new TopicMigratedException("Topic was already migrated");
} else if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
return internalAddProducer(producer).thenApply(ignore -> {
.thenCompose(producerEpoch ->
internalAddProducer(producer).thenApply(ignore -> {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
return producerEpoch;
});
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
lock.writeLock().unlock();
}
});
}));
}

protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
Expand Down Expand Up @@ -969,23 +953,40 @@ protected void checkTopicFenced() throws BrokerServiceException {
}

protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max same address producers limit"));
}
lock.writeLock().lock();
try {
checkTopicFenced();
if (isMigrated()) {
log.warn("[{}] Attempting to add producer to a migrated topic", topic);
throw new TopicMigratedException("Topic was already migrated");
} else if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}
if (isSameAddressProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit",
topic);
return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException(
"Topic '" + topic + "' reached max same address producers limit"));
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
return tryOverwriteOldProducer(existProducer, producer);
} else if (!producer.isRemote()) {
USER_CREATED_PRODUCER_COUNTER_UPDATER.incrementAndGet(this);
}
return CompletableFuture.completedFuture(null);
} catch (BrokerServiceException e) {
return FutureUtil.failedFuture(e);
} finally {
lock.writeLock().unlock();
}
return CompletableFuture.completedFuture(null);
}

private CompletableFuture<Void> tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,163 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import io.netty.channel.embedded.EmbeddedChannel;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class AbstractTopicTest {
private AbstractSubscription subscription;
private AbstractTopic topic;
public class AbstractTopicTest extends BrokerTestBase {

private static final Logger LOG = LoggerFactory.getLogger(AbstractTopicTest.class);

@DataProvider(name = "topicAndSubscription")
public Object[][] topicAndSubscriptionProvider() {
Supplier<Pair<AbstractTopic, AbstractSubscription>> persistentProvider =
createProvider(TopicDomain.persistent, PersistentSubscription.class);
Supplier<Pair<AbstractTopic, AbstractSubscription>> nonPersistentProvider =
createProvider(TopicDomain.non_persistent, NonPersistentSubscription.class);
return new Object[][]{{persistentProvider}, {nonPersistentProvider}};
}

private Supplier<Pair<AbstractTopic, AbstractSubscription>> createProvider(
TopicDomain topicDomain, Class<? extends AbstractSubscription> subscriptionClass) {
return () -> {
String topicName = topicDomain.value() + "://public/default/topic-0";
try {
admin.topics().createNonPartitionedTopic(topicName);
} catch (PulsarAdminException e) {
throw new RuntimeException("Create Topic failed", e);
}
Optional<Topic> topicOpt = pulsar.getBrokerService()
.getTopicReference(topicName);
if (topicOpt.isEmpty()) {
throw new RuntimeException("Topic " + topicName + " not found");
}
Topic topic = spy(topicOpt.get());
AbstractSubscription subscription = mock(subscriptionClass);
doReturn(Map.of("subscription", subscription))
.when(topic).getSubscriptions();
return Pair.of((AbstractTopic) topic, subscription);
};
}

@BeforeMethod
public void beforeMethod() {
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
subscription = mock(AbstractSubscription.class);

when(brokerService.pulsar()).thenReturn(pulsarService);
doReturn(pulsarService).when(brokerService).getPulsar();
when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
doReturn(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK).when(pulsarService).getMonotonicClock();

topic = mock(AbstractTopic.class, withSettings()
.useConstructor("topic", brokerService)
.defaultAnswer(CALLS_REAL_METHODS));

final var subscriptions = new ConcurrentHashMap<String, Subscription>();
subscriptions.put("subscription", subscription);
when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions);
@Override
protected void setup() throws Exception {
super.internalSetup();
super.setupDefaultTenantAndNamespace();
}

@Test
public void testGetMsgOutCounter() {
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(dataProvider = "topicAndSubscription")
public void testGetMsgOutCounter(Supplier<Pair<AbstractTopic, AbstractSubscription>> provider) {
Pair<AbstractTopic, AbstractSubscription> topicAndSubscription = provider.get();
AbstractTopic topic = topicAndSubscription.getLeft();
AbstractSubscription subscription = topicAndSubscription.getRight();
topic.msgOutFromRemovedSubscriptions.add(1L);
when(subscription.getMsgOutCounter()).thenReturn(2L);
assertEquals(topic.getMsgOutCounter(), 3L);
}

@Test
public void testGetBytesOutCounter() {
@Test(dataProvider = "topicAndSubscription")
public void testGetBytesOutCounter(Supplier<Pair<AbstractTopic, AbstractSubscription>> provider) {
Pair<AbstractTopic, AbstractSubscription> topicAndSubscription = provider.get();
AbstractTopic topic = topicAndSubscription.getLeft();
AbstractSubscription subscription = topicAndSubscription.getRight();
topic.bytesOutFromRemovedSubscriptions.add(1L);
when(subscription.getBytesOutCounter()).thenReturn(2L);
assertEquals(topic.getBytesOutCounter(), 3L);
}

@Test(dataProvider = "topicAndSubscription")
public void testOverwriteOldProducerAfterTopicClosed(
Supplier<Pair<AbstractTopic, AbstractSubscription>> provider) throws InterruptedException {
Pair<AbstractTopic, AbstractSubscription> topicAndSubscription = provider.get();
AbstractTopic topic = topicAndSubscription.getLeft();
Producer oldProducer = spy(createProducer(topic));
ServerCnx oldCnx = spy((ServerCnx) oldProducer.getCnx());
doReturn(oldCnx).when(oldProducer).getCnx();

// Add old producer
topic.addProducer(oldProducer, new CompletableFuture<>()).join();

CountDownLatch oldCnxCheckInvokedLatch = new CountDownLatch(1);
CountDownLatch oldCnxCheckStartLatch = new CountDownLatch(1);
doAnswer(invocation -> {
CompletableFuture<Optional<Boolean>> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
oldCnxCheckInvokedLatch.countDown();
oldCnxCheckStartLatch.await();
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
future.complete(Optional.of(false));
});
return future;
}).when(oldCnx).checkConnectionLiveness();

// Add new producer
Producer newProducer = createProducer(topic);
CompletableFuture<Optional<Long>> producerEpoch =
topic.addProducer(newProducer, new CompletableFuture<>());

// Wait until new producer entered `AbstractTopic#tryOverwriteOldProducer`
oldCnxCheckInvokedLatch.await();

topic.close(true);
// Run pending tasks to remove old producer from topic.
((EmbeddedChannel) oldCnx.ctx().channel()).runPendingTasks();

// Unblock ServerCnx#checkConnectionLiveness to resume `AbstractTopic#tryOverwriteOldProducer`
oldCnxCheckStartLatch.countDown();

// As topic is fenced, adding new producer should fail.
try {
producerEpoch.join();
fail("TopicFencedException expected");
} catch (CompletionException e) {
LOG.error("Failed to add producer", e);
assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class);
}
}

private Producer createProducer(Topic topic) {
ServerCnx serverCnx = new ServerCnx(pulsar);
new EmbeddedChannel(serverCnx);
assertNotNull(serverCnx.ctx());
return new Producer(topic, serverCnx, 1, "prod-name",
"app-0", false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(), true);
}
}
Loading