Skip to content

Commit

Permalink
fix conflict.
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed Dec 5, 2024
1 parent 04cec0f commit d791ecd
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.HashSet;
Expand Down Expand Up @@ -311,19 +312,64 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
// negative topic message id
consumer.negativeAcknowledge(topicMessageId);
NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker();
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L);
assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L);
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
}

/**
* If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered
* with the correct delay. However, all messages are redelivered at the same time.
* @throws Exception
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
cleanup();
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch");

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(3, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(2)
.create();
// send two messages in the same batch
producer.sendAsync("test-0");
producer.sendAsync("test-1");
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// wait for 2s, negative ack the second message
Thread.sleep(2000);
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

@Test
public void testNegativeAcksWithBatchAckEnabled() throws Exception {
cleanup();
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
<include>org.reactivestreams:reactive-streams</include>
<include>org.tukaani:xz</include>
<include>org.yaml:snakeyaml</include>
<include>it.unimi.dsi:fastutil</include>
</includes>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2745,7 +2745,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
Expand All @@ -2756,7 +2756,7 @@ private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,50 @@
import io.netty.util.Timer;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private ConcurrentLongLongPairHashMap nackedMessages = null;
// timestamp -> ledgerId -> entryId, no need to batch index, if different messages have
// different timestamp, there will be multiple entries in the map
// RB Tree -> LongOpenHashMap -> Roaring64Bitmap
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = null;

private final ConsumerBase<?> consumer;
private final Timer timer;
private final long nackDelayNanos;
private final long timerIntervalNanos;
private final long nackDelayMs;
private final RedeliveryBackoff negativeAckRedeliveryBackoff;
private final int negativeAckPrecisionBitCnt;

private Timeout timeout;

// Set a min delay to allow for grouping nacks within a single batch
private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE;
private static final long MIN_NACK_DELAY_MS = 100;
private static final int DUMMY_PARTITION_INDEX = -2;

public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?> conf) {
this.consumer = consumer;
this.timer = consumer.getClient().timer();
this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_NANOS);
this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()),
MIN_NACK_DELAY_MS);
this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff();
if (negativeAckRedeliveryBackoff != null) {
this.timerIntervalNanos = Math.max(
TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)),
MIN_NACK_DELAY_NANOS) / 3;
} else {
this.timerIntervalNanos = nackDelayNanos / 3;
}
this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt();
}

private void triggerRedelivery(Timeout t) {
Expand All @@ -76,21 +78,48 @@ private void triggerRedelivery(Timeout t) {
return;
}

long now = System.nanoTime();
nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> {
if (timestamp < now) {
MessageId msgId = new MessageIdImpl(ledgerId, entryId,
// need to covert non-partitioned topic partition index to -1
(int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex));
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
long currentTimestamp = System.currentTimeMillis();
for (long timestamp : nackedMessages.keySet()) {
if (timestamp > currentTimestamp) {
// We are done with all the messages that need to be redelivered
break;
}

Long2ObjectMap<Roaring64Bitmap> ledgerMap = nackedMessages.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX);
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
}
}

// remove entries from the nackedMessages map
LongBidirectionalIterator iterator = nackedMessages.keySet().iterator();
while (iterator.hasNext()) {
long timestamp = iterator.nextLong();
if (timestamp <= currentTimestamp) {
iterator.remove();
} else {
break;
}
}

// Schedule the next redelivery if there are still messages to redeliver
if (!nackedMessages.isEmpty()) {
long nextTriggerTimestamp = nackedMessages.firstLongKey();
long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0);
if (delayMs > 0) {
this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS);
} else {
this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS);
}
});
for (MessageId messageId : messagesToRedeliver) {
nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId());
} else {
this.timeout = null;
}
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}

// release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages,
Expand All @@ -110,39 +139,56 @@ public synchronized void add(Message<?> message) {
add(message.getMessageId(), message.getRedeliveryCount());
}

static long trimLowerBit(long timestamp, int bits) {
return timestamp & (-1L << bits);
}

private synchronized void add(MessageId messageId, int redeliveryCount) {
if (nackedMessages == null) {
nackedMessages = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(true)
.concurrencyLevel(1)
.build();
nackedMessages = new Long2ObjectRBTreeMap<>();
}

long backoffNs;
long backoffMs;
if (negativeAckRedeliveryBackoff != null) {
backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
backoffNs = nackDelayNanos;
backoffMs = nackDelayMs;
}
MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId);
// ConcurrentLongLongPairHashMap requires the key and value >=0.
// partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use
// partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to
// avoid exception from ConcurrentLongLongPairHashMap.
nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(),
messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() :
NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs);
MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt);
nackedMessages.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>())
.computeIfAbsent(messageIdAdv.getLedgerId(), k -> new Roaring64Bitmap())
.add(messageIdAdv.getEntryId());

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS);
}
}

/**
* Discard the batch index and partition index from the message id.
*
* @param messageId
* @return
*/
public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) {
if (messageId instanceof ChunkMessageIdImpl) {
return (MessageIdAdv) messageId;
}
MessageIdAdv msgId = (MessageIdAdv) messageId;
return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), DUMMY_PARTITION_INDEX);
}

@VisibleForTesting
Optional<Long> getNackedMessagesCount() {
return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size);
synchronized long getNackedMessagesCount() {
if (nackedMessages == null) {
return 0;
}
return nackedMessages.values().stream().mapToLong(
ledgerMap -> ledgerMap.values().stream().mapToLong(
Roaring64Bitmap::getLongCardinality).sum()).sum();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
)
private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);

@ApiModelProperty(
name = "negativeAckPrecisionBitCnt",
value = "The redelivery time precision bit count. The lower bits of the redelivery time will be"
+ "trimmed to reduce the memory occupation.\nThe default value is 8, which means the"
+ "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)"
+ "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be"
+ "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms."
)
private int negativeAckPrecisionBitCnt = 8;

@ApiModelProperty(
name = "maxTotalReceiverQueueSizeAcrossPartitions",
value = "The max total receiver queue size across partitions.\n"
Expand Down

0 comments on commit d791ecd

Please sign in to comment.