Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[improve][ws] Support subscribing multi/pattern topic for Websocket (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Nov 6, 2023
1 parent 957337b commit eebd821
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -1081,6 +1082,11 @@ private void addWebSocketServiceHandler(WebService webService,
new ServletHolder(readerWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true, attributeMap);

final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
new WebSocketMultiTopicConsumerServlet(webSocketService);
webService.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,
new ServletHolder(multiTopicConsumerWebSocketServlet), true, attributeMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ public void ackBatchMessageTest() throws Exception {

WebSocketClient consumerClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
Expand Down Expand Up @@ -933,6 +934,7 @@ public void consumeEncryptedMessages() throws Exception {
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
Expand Down Expand Up @@ -1051,5 +1053,71 @@ private void stopWebSocketClient(WebSocketClient... clients) {
log.info("proxy clients are stopped successfully");
}

@Test
public void testMultiTopics() throws Exception {
final String subscription1 = "my-sub1";
final String subscription2 = "my-sub2";
final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;

final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
"/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";

int messages = 10;
WebSocketClient consumerClient1 = new WebSocketClient();
WebSocketClient consumerClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
@Cleanup
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic(topic1)
.batchingMaxMessages(1)
.create();
@Cleanup
Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic(topic2)
.batchingMaxMessages(1)
.create();

try {
consumerClient1.start();
consumerClient2.start();
ClientUpgradeRequest consumerRequest1 = new ClientUpgradeRequest();
ClientUpgradeRequest consumerRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumerClient1.connect(consumeSocket1, URI.create(consumerUri1), consumerRequest1);
Future<Session> consumerFuture2 = consumerClient2.connect(consumeSocket2, URI.create(consumerUri2), consumerRequest2);

assertTrue(consumerFuture1.get().isOpen());
assertTrue(consumerFuture2.get().isOpen());
assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);

for (int i = 1; i <= messages; i ++) {
producer1.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
producer2.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
producer1.flush();
producer2.flush();

consumeSocket1.sendPermits(2 * messages);
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket1.getReceivedMessagesCount(), 2 * messages));
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topic1).getSubscriptions()
.get(subscription1).getMsgBacklog(), 0));
Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topic2).getSubscriptions()
.get(subscription1).getMsgBacklog(), 0));

consumeSocket2.sendPermits(2 * messages);
Awaitility.await().untilAsserted(() ->
assertEquals(consumeSocket2.getReceivedMessagesCount(), 2 * messages));
} finally {
stopWebSocketClient(consumerClient1, consumerClient2);
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -336,6 +337,11 @@ public static void addWebServerHandlers(WebServer server,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));

final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
new WebSocketMultiTopicConsumerServlet(webSocketService);
server.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,
new ServletHolder(multiTopicConsumerWebSocketServlet));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected final WebSocketService service;
protected final HttpServletRequest request;

protected final TopicName topic;
protected TopicName topic;
protected final Map<String, String> queryParams;
private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
protected final ObjectReader consumerCommandReader =
Expand All @@ -80,12 +80,12 @@ public AbstractWebSocketHandler(WebSocketService service,
ServletUpgradeResponse response) {
this.service = service;
this.request = new WebSocketHttpServletRequestWrapper(request);
this.topic = extractTopicName(request);

this.queryParams = new TreeMap<>();
request.getParameterMap().forEach((key, values) -> {
queryParams.put(key, values[0]);
});
extractTopicName(request);
}

protected boolean checkAuth(ServletUpgradeResponse response) {
Expand Down Expand Up @@ -244,7 +244,7 @@ protected String checkAuthentication() {
return null;
}

private TopicName extractTopicName(HttpServletRequest request) {
protected void extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri);

Expand Down Expand Up @@ -287,7 +287,7 @@ private TopicName extractTopicName(HttpServletRequest request) {
}
final String name = Codec.decode(topicName.toString());

return TopicName.get(domain, namespace, name);
topic = TopicName.get(domain, namespace, name);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
Expand All @@ -75,7 +72,7 @@
*/
public class ConsumerHandler extends AbstractWebSocketHandler {

private String subscription = null;
protected String subscription = null;
private SubscriptionType subscriptionType;
private SubscriptionMode subscriptionMode;
private Consumer<byte[]> consumer;
Expand All @@ -88,6 +85,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
private final LongAdder numBytesDelivered;
private final LongAdder numMsgsAcked;
private volatile long msgDeliveredCounter = 0;

protected String topicsPattern;

protected String topics;
private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");

Expand Down Expand Up @@ -123,7 +124,14 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
return;
}

this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
if (topicsPattern != null) {
this.consumer = builder.topicsPattern(topicsPattern).subscriptionName(subscription).subscribe();
} else if (topics != null) {
this.consumer = builder.topics(Splitter.on(",").splitToList(topics))
.subscriptionName(subscription).subscribe();
} else {
this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
}
if (!this.service.addConsumer(this)) {
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic);
Expand Down Expand Up @@ -299,8 +307,7 @@ private void checkResumeReceive() {

private void handleAck(ConsumerCommand command) throws IOException {
// We should have received an ack
TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
(MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
Expand Down Expand Up @@ -490,7 +497,7 @@ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authent
}
}

public static String extractSubscription(HttpServletRequest request) {
public String extractSubscription(HttpServletRequest request) {
String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri);

Expand Down
Loading

0 comments on commit eebd821

Please sign in to comment.