Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix] [broker] Fix can not subscribe partitioned topic with a suffix-…
Browse files Browse the repository at this point in the history
…matched regexp (apache#22025)
  • Loading branch information
poorbarcode authored Feb 19, 2024
1 parent 825e997 commit b375e86
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public TopicResources(MetadataStore store) {
store.registerListener(this::handleNotification);
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> listPersistentTopicsAsync(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,9 @@ private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicNam
});
}

/***
* List persistent topics names under a namespace, the topic name contains the partition suffix.
*/
public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
return pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(namespaceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,18 @@ public void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError err
writeAndFlush(outBuf);
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
@Override
public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) {
BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics);
interceptAndWriteCommand(command);
}

/***
* {@inheritDoc}
*/
@Override
public void sendWatchTopicListUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
Expand All @@ -42,11 +43,16 @@ public class TopicListService {

public static class TopicListWatcher implements BiConsumer<String, NotificationType> {

/** Topic names which are matching, the topic name contains the partition suffix. **/
private final List<String> matchingTopics;
private final TopicListService topicListService;
private final long id;
/** The regexp for the topic name(not contains partition suffix). **/
private final Pattern topicsPattern;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(TopicListService topicListService, long id,
Pattern topicsPattern, List<String> topics) {
this.topicListService = topicListService;
Expand All @@ -59,9 +65,12 @@ public List<String> getMatchingTopics() {
return matchingTopics;
}

/***
* @param topicName topic name which contains partition suffix.
*/
@Override
public void accept(String topicName, NotificationType notificationType) {
if (topicsPattern.matcher(topicName).matches()) {
if (topicsPattern.matcher(TopicName.get(topicName).getPartitionedTopicName()).matches()) {
List<String> newTopics;
List<String> deletedTopics;
if (notificationType == NotificationType.Deleted) {
Expand Down Expand Up @@ -109,6 +118,9 @@ public void inactivate() {
}
}

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern,
String topicsHash, Semaphore lookupSemaphore) {

Expand Down Expand Up @@ -184,7 +196,9 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
});
}


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture,
NamespaceName namespace, long watcherId, Pattern topicsPattern) {
namespaceService.getListOfPersistentTopics(namespace).
Expand Down Expand Up @@ -246,6 +260,10 @@ public void deleteTopicListWatcher(Long watcherId) {
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public void sendTopicListUpdate(long watcherId, String topicsHash, List<String> deletedTopics,
List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, deletedTopics, topicsHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,16 +681,27 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher(boolean delayWatchi
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
@DataProvider(name= "regexpConsumerArgs")
public Object[][] regexpConsumerArgs(){
return new Object[][]{
{true},
{false}
{true, true},
{true, false},
{false, true},
{false, false}
};
}

@Test(timeOut = testTimeout, dataProvider = "partitioned")
public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
private void waitForTopicListWatcherStarted(Consumer consumer) {
Awaitility.await().untilAsserted(() -> {
CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture");
log.info("isDone: {}, isCompletedExceptionally: {}", completableFuture.isDone(),
completableFuture.isCompletedExceptionally());
assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally());
});
}

@Test(timeOut = testTimeout, dataProvider = "regexpConsumerArgs")
public void testPreciseRegexpSubscribe(boolean partitioned, boolean createTopicAfterWatcherStarted) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subscriptionName = "s1";
final Pattern pattern = Pattern.compile(String.format("%s$", topicName));
Expand All @@ -704,6 +715,9 @@ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();
if (createTopicAfterWatcherStarted) {
waitForTopicListWatcherStarted(consumer);
}

// 1. create topic.
if (partitioned) {
Expand Down Expand Up @@ -733,6 +747,14 @@ public void testPreciseRegexpSubscribe(boolean partitioned) throws Exception {
}
}

@DataProvider(name= "partitioned")
public Object[][] partitioned(){
return new Object[][]{
{true},
{true}
};
}

@Test(timeOut = 240 * 1000, dataProvider = "partitioned")
public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> topics(List<String> topicNames);

/**
* Specify a pattern for topics that this consumer subscribes to.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>The pattern is applied to subscribe to all topics, within a single namespace, that match the
* pattern.
*
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* a regular expression to select a list of topics to subscribe to
* a regular expression to select a list of topics(not contains the partition suffix) to subscribe to
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);

/**
* Specify a pattern for topics that this consumer subscribes to.
* Specify a pattern for topics(not contains the partition suffix) that this consumer subscribes to.
*
* <p>It accepts a regular expression that is compiled into a pattern internally. E.g.,
* "persistent://public/default/pattern-topic-.*"
Expand All @@ -151,7 +151,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* given regular expression for topics pattern
* given regular expression for topics(not contains the partition suffix) pattern
* @return the consumer builder instance
*/
ConsumerBuilder<T> topicsPattern(String topicsPattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,10 @@ private void removeTopic(String topic) {
}
}

// subscribe one more given topic
/***
* Subscribe one more given topic.
* @param topicName topic name without the partition suffix.
*/
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
Expand Down Expand Up @@ -1251,7 +1254,10 @@ public CompletableFuture<Void> unsubscribeAsync(String topicName) {
return unsubscribeFuture;
}

// Remove a consumer for a topic
/***
* Remove a consumer for a topic.
* @param topicName topic name contains the partition suffix.
*/
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
private volatile Timeout recheckPatternTimeout = null;
private volatile String topicsHash;

/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
String topicsHash,
PulsarClientImpl client,
Expand Down Expand Up @@ -220,14 +223,26 @@ void setTopicsHash(String topicsHash) {
}

interface TopicsChangedListener {
// unsubscribe and delete ConsumerImpl in the `consumers` map in `MultiTopicsConsumerImpl` based on added topics
/***
* unsubscribe and delete {@link ConsumerImpl} in the {@link MultiTopicsConsumerImpl#consumers} map in
* {@link MultiTopicsConsumerImpl}.
* @param removedTopics topic names removed(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics);
// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in
// `MultiTopicsConsumerImpl`.

/***
* subscribe and create a list of new {@link ConsumerImpl}, added them to the
* {@link MultiTopicsConsumerImpl#consumers} map in {@link MultiTopicsConsumerImpl}.
* @param addedTopics topic names added(contains the partition suffix).
*/
CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
}

private class PatternTopicsChangedListener implements TopicsChangedListener {

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
CompletableFuture<Void> removeFuture = new CompletableFuture<>();
Expand All @@ -249,6 +264,9 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
return removeFuture;
}

/**
* {@inheritDoc}
*/
@Override
public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
CompletableFuture<Void> addFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class TopicListWatcher extends HandlerState implements ConnectionHandler.
private final Runnable recheckTopicsChangeAfterReconnect;


/***
* @param topicsPattern The regexp for the topic name(not contains partition suffix).
*/
public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener,
PulsarClientImpl client, Pattern topicsPattern, long watcherId,
NamespaceName namespace, String topicsHash,
Expand Down Expand Up @@ -142,7 +145,6 @@ public CompletableFuture<Void> connectionOpened(ClientCnx cnx) {
return;
}
}

this.connectionHandler.resetBackoff();

recheckTopicsChangeAfterReconnect.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

@ApiModelProperty(
name = "topicsPattern",
value = "Topic pattern"
value = "The regexp for the topic name(not contains partition suffix)."
)
private Pattern topicsPattern;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,9 @@ public static BaseCommand newWatchTopicList(
return cmd;
}

/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
*/
public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherId, String topicsHash,
List<String> topics) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS);
Expand All @@ -1600,6 +1603,10 @@ public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherI
return cmd;
}

/**
* @param deletedTopics topic names deleted(contains the partition suffix).
* @param newTopics topics names added(contains the partition suffix).
*/
public static BaseCommand newWatchTopicUpdate(long watcherId,
List<String> newTopics, List<String> deletedTopics, String topicsHash) {
BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE);
Expand Down

0 comments on commit b375e86

Please sign in to comment.