Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,38 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception {
future.get(3, TimeUnit.SECONDS);
assertTrue(index.get() <= 0);
}

@Test
public void testGetRawMessage() throws Exception {
String topic = "persistent://public/default/testGetRawMessage";
admin.topics().createNonPartitionedTopic(topic);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

String testKey = "key1";
String testValue = "value1";
producer.newMessage()
.key(testKey)
.value(testValue)
.property("myProp", "myValue")
.send();

@Cleanup
TableView<String> tableView = pulsarClient.newTableView(Schema.STRING)
.topic(topic)
.create();

Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1);

Message<?> rawMessage = tableView.getRawMessage(testKey);
assertTrue(rawMessage != null, "Raw message should not be null for key: " + testKey);
assertEquals(rawMessage.getKey(), testKey);
assertEquals(new String(rawMessage.getData()), testValue);
assertEquals(rawMessage.getProperty("myProp"), "myValue");

Message<?> missingMessage = tableView.getRawMessage("missingKey");
assertTrue(missingMessage == null, "Raw message should be null for missing key");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ public interface TableView<T> extends Closeable {
*/
T get(String key);

/**
* Returns the raw Pulsar {@link Message} object associated with the specified key.
*
* <p>This method allows access to the original Pulsar message containing raw payload bytes,
* message properties, message ID, and other metadata. It is useful for scenarios where
* clients require access to the entire message beyond the deserialized value {@code T}
* provided by {@link #get(String)}.
*
* @param key the key whose associated raw message is to be returned
* @return the raw {@link Message} object associated with the key, or {@code null} if no mapping exists
*/
Message<?> getRawMessage(String key);

/**
* Returns a Set view of the mappings contained in this map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,29 @@
*/
package org.apache.pulsar.client.impl;

import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;

import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;

@Slf4j
public class TableViewImpl<T> implements TableView<T> {

private final TableViewConfigurationData conf;

private final ConcurrentMap<String, T> data;
private final ConcurrentMap<String, Message<?>> rawMessages;
private final Map<String, T> immutableData;

private final CompletableFuture<Reader<T>> reader;
Expand Down Expand Up @@ -85,6 +74,7 @@ public class TableViewImpl<T> implements TableView<T> {
this.conf = conf;
this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
this.data = new ConcurrentHashMap<>();
this.rawMessages = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
Expand Down Expand Up @@ -144,6 +134,11 @@ public T get(String key) {
return data.get(key);
}

@Override
public Message<?> getRawMessage(String key) {
return rawMessages.get(key);
}

@Override
public Set<Map.Entry<String, T>> entrySet() {
return immutableData.entrySet();
Expand Down Expand Up @@ -205,56 +200,68 @@ public void close() throws PulsarClientException {

private void handleMessage(Message<T> msg) {
lastReadPositions.put(msg.getTopicName(), msg.getMessageId());
try {
if (msg.hasKey()) {
String key = msg.getKey();
T cur = msg.size() > 0 ? msg.getValue() : null;
if (log.isDebugEnabled()) {
log.debug("Applying message from topic {}. key={} value={}",
conf.getTopicName(),
key,
cur);
}

boolean update = true;
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
if (!update) {
log.info("Skipped the message from topic {}. key={} value={} prev={}",
conf.getTopicName(),
key,
cur,
prev);
compactionStrategy.handleSkippedMessage(key, cur);
}
}
if (!msg.hasKey()) {
log.warn("Received message with no key, releasing.");
msg.release();
checkAllFreshTask(msg);
return;
}

if (update) {
try {
listenersMutex.lock();
if (null == cur) {
data.remove(key);
} else {
data.put(key, cur);
}
String key = msg.getKey();
T cur = msg.size() > 0 ? msg.getValue() : null;
if (log.isDebugEnabled()) {
log.debug("Applying message from topic {}. key={} value={}",
conf.getTopicName(),
key,
cur);
}

for (BiConsumer<String, T> listener : listeners) {
try {
listener.accept(key, cur);
} catch (Throwable t) {
log.error("Table view listener raised an exception", t);
}
}
} finally {
listenersMutex.unlock();
}
boolean update = true;
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
if (!update) {
log.info("Skipped the message from topic {}. key={} value={} prev={}",
conf.getTopicName(),
key,
cur,
prev);
compactionStrategy.handleSkippedMessage(key, cur);
msg.release();
checkAllFreshTask(msg);
return;
}
}

try {
listenersMutex.lock();
Message<?> oldRawMessage;
if (null == cur) {
data.remove(key);
oldRawMessage = rawMessages.remove(key);
msg.release();
} else {
data.put(key, cur);
oldRawMessage = rawMessages.put(key, msg);
}

if (oldRawMessage != null) {
oldRawMessage.release();
}

for (BiConsumer<String, T> listener : listeners) {
try {
listener.accept(key, cur);
} catch (Throwable t) {
log.error("Table view listener raised an exception", t);
}
}
checkAllFreshTask(msg);
} finally {
msg.release();
listenersMutex.unlock();
}

checkAllFreshTask(msg);
}

@Override
Expand Down