From 957337bc02af4f04f8da914bf328bd3d36c2c8b2 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Mon, 6 Nov 2023 15:55:07 +0800 Subject: [PATCH] [fix][client] Avert extensive time consumption during table view construction (#21270) Reopen https://github.com/apache/pulsar/pull/21170 ### Motivation If a topic persistently experiences a substantial quantity of data inputs, the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time. ### Modification In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion. --- .../pulsar/client/impl/TableViewTest.java | 60 +++++++++++++++++++ .../pulsar/client/impl/TableViewImpl.java | 28 +++++++-- 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 6c6da5870aed9..523360884c1bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -20,16 +20,21 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Method; import java.time.Duration; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -39,6 +44,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; @@ -46,6 +52,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -438,4 +445,57 @@ public void testTableViewTailMessageReadRetry() throws Exception { }); verify(consumer, times(msgCnt)).receiveAsync(); } + + @Test + public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { + String topic = "persistent://public/default/testBuildTableViewWithMessagesAlwaysAvailable"; + admin.topics().createPartitionedTopic(topic, 10); + @Cleanup + Reader reader = pulsarClient.newReader() + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .create(); + // Prepare real data to do test. + for (int i = 0; i < 1000; i++) { + producer.newMessage().send(); + } + List lastMessageIds = reader.getLastMessageIds(); + + // Use mock reader to build tableview. In the old implementation, the readAllExistingMessages method + // will not be completed because the `mockReader.hasMessageAvailable()` always return ture. + Reader mockReader = spy(reader); + when(mockReader.hasMessageAvailable()).thenReturn(true); + when(mockReader.getLastMessageIdsAsync()).thenReturn(CompletableFuture.completedFuture(lastMessageIds)); + AtomicInteger index = new AtomicInteger(lastMessageIds.size()); + when(mockReader.readNextAsync()).thenAnswer(invocation -> { + Message message = spy(Message.class); + int localIndex = index.decrementAndGet(); + if (localIndex >= 0) { + when(message.getTopicName()).thenReturn(lastMessageIds.get(localIndex).getOwnerTopic()); + when(message.getMessageId()).thenReturn(lastMessageIds.get(localIndex)); + when(message.hasKey()).thenReturn(false); + doNothing().when(message).release(); + } + return CompletableFuture.completedFuture(message); + }); + @Cleanup + TableViewImpl tableView = (TableViewImpl) pulsarClient.newTableView() + .topic(topic) + .createAsync() + .get(); + TableViewImpl mockTableView = spy(tableView); + Method readAllExistingMessagesMethod = TableViewImpl.class + .getDeclaredMethod("readAllExistingMessages", Reader.class); + readAllExistingMessagesMethod.setAccessible(true); + CompletableFuture> future = + (CompletableFuture>) readAllExistingMessagesMethod.invoke(mockTableView, mockReader); + + // The future will complete after receive all the messages from lastMessageIds. + future.get(3, TimeUnit.SECONDS); + assertTrue(index.get() <= 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 560636f94622b..151c96d96aa40 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; +import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.topics.TopicCompactionStrategy; @@ -235,20 +236,40 @@ private CompletableFuture> readAllExistingMessages(Reader reader) { AtomicLong messagesRead = new AtomicLong(); CompletableFuture> future = new CompletableFuture<>(); - readAllExistingMessages(reader, future, startTime, messagesRead); + reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> { + Map maxMessageIds = new ConcurrentHashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId); + }); + readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + future.thenAccept(__ -> readTailMessages(reader)); return future; } private void readAllExistingMessages(Reader reader, CompletableFuture> future, long startTime, - AtomicLong messagesRead) { + AtomicLong messagesRead, Map maxMessageIds) { reader.hasMessageAvailableAsync() .thenAccept(hasMessage -> { if (hasMessage) { reader.readNextAsync() .thenAccept(msg -> { messagesRead.incrementAndGet(); + // We need remove the partition from the maxMessageIds map + // once the partition has been read completely. + TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName()); + if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) { + maxMessageIds.remove(msg.getTopicName()); + } handleMessage(msg); - readAllExistingMessages(reader, future, startTime, messagesRead); + if (maxMessageIds.isEmpty()) { + future.complete(reader); + } else { + readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); + } }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { log.error("Reader {} was closed while reading existing messages.", @@ -269,7 +290,6 @@ private void readAllExistingMessages(Reader reader, CompletableFuture