Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][client] Avert extensive time consumption during table view cons…
Browse files Browse the repository at this point in the history
…truction (apache#21270)

Reopen apache#21170
### Motivation
If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
### Modification
In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
  • Loading branch information
liangyepianzhou authored Nov 6, 2023
1 parent e5af097 commit 957337b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,13 +44,15 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -438,4 +445,57 @@ public void testTableViewTailMessageReadRetry() throws Exception {
});
verify(consumer, times(msgCnt)).receiveAsync();
}

@Test
public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception {
String topic = "persistent://public/default/testBuildTableViewWithMessagesAlwaysAvailable";
admin.topics().createPartitionedTopic(topic, 10);
@Cleanup
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
// Prepare real data to do test.
for (int i = 0; i < 1000; i++) {
producer.newMessage().send();
}
List<TopicMessageId> lastMessageIds = reader.getLastMessageIds();

// Use mock reader to build tableview. In the old implementation, the readAllExistingMessages method
// will not be completed because the `mockReader.hasMessageAvailable()` always return ture.
Reader<byte[]> mockReader = spy(reader);
when(mockReader.hasMessageAvailable()).thenReturn(true);
when(mockReader.getLastMessageIdsAsync()).thenReturn(CompletableFuture.completedFuture(lastMessageIds));
AtomicInteger index = new AtomicInteger(lastMessageIds.size());
when(mockReader.readNextAsync()).thenAnswer(invocation -> {
Message<byte[]> message = spy(Message.class);
int localIndex = index.decrementAndGet();
if (localIndex >= 0) {
when(message.getTopicName()).thenReturn(lastMessageIds.get(localIndex).getOwnerTopic());
when(message.getMessageId()).thenReturn(lastMessageIds.get(localIndex));
when(message.hasKey()).thenReturn(false);
doNothing().when(message).release();
}
return CompletableFuture.completedFuture(message);
});
@Cleanup
TableViewImpl<byte[]> tableView = (TableViewImpl<byte[]>) pulsarClient.newTableView()
.topic(topic)
.createAsync()
.get();
TableViewImpl<byte[]> mockTableView = spy(tableView);
Method readAllExistingMessagesMethod = TableViewImpl.class
.getDeclaredMethod("readAllExistingMessages", Reader.class);
readAllExistingMessagesMethod.setAccessible(true);
CompletableFuture<Reader<?>> future =
(CompletableFuture<Reader<?>>) readAllExistingMessagesMethod.invoke(mockTableView, mockReader);

// The future will complete after receive all the messages from lastMessageIds.
future.get(3, TimeUnit.SECONDS);
assertTrue(index.get() <= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;

Expand Down Expand Up @@ -235,20 +236,40 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();

CompletableFuture<Reader<T>> future = new CompletableFuture<>();
readAllExistingMessages(reader, future, startTime, messagesRead);
reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
Map<String, TopicMessageId> maxMessageIds = new ConcurrentHashMap<>();
lastMessageIds.forEach(topicMessageId -> {
maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
});
readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
future.thenAccept(__ -> readTailMessages(reader));
return future;
}

private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
AtomicLong messagesRead) {
AtomicLong messagesRead, Map<String, TopicMessageId> maxMessageIds) {
reader.hasMessageAvailableAsync()
.thenAccept(hasMessage -> {
if (hasMessage) {
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
// We need remove the partition from the maxMessageIds map
// once the partition has been read completely.
TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName());
if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) {
maxMessageIds.remove(msg.getTopicName());
}
handleMessage(msg);
readAllExistingMessages(reader, future, startTime, messagesRead);
if (maxMessageIds.isEmpty()) {
future.complete(reader);
} else {
readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
}
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
log.error("Reader {} was closed while reading existing messages.",
Expand All @@ -269,7 +290,6 @@ private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<
messagesRead,
durationMillis / 1000.0);
future.complete(reader);
readTailMessages(reader);
}
});
}
Expand Down

0 comments on commit 957337b

Please sign in to comment.