diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index b372ecabc5de4..f8bc30f09667c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.util.HashSet; @@ -311,7 +312,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { // negative topic message id consumer.negativeAcknowledge(topicMessageId); NegativeAcksTracker negativeAcksTracker = consumer.getNegativeAcksTracker(); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); // negative batch message id @@ -319,11 +320,56 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { consumer.negativeAcknowledge(batchMessageId); consumer.negativeAcknowledge(batchMessageId2); consumer.negativeAcknowledge(batchMessageId3); - assertEquals(negativeAcksTracker.getNackedMessagesCount().orElse((long) -1).longValue(), 1L); + assertEquals(negativeAcksTracker.getNackedMessagesCount(), 1L); assertEquals(unAckedMessageTracker.size(), 0); negativeAcksTracker.close(); } + /** + * If we nack multiple messages in the same batch with different redelivery delays, the messages should be redelivered + * with the correct delay. However, all messages are redelivered at the same time. + * @throws Exception + */ + @Test + public void testNegativeAcksWithBatch() throws Exception { + cleanup(); + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + setup(); + String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithBatch"); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(3, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .batchingMaxMessages(2) + .create(); + // send two messages in the same batch + producer.sendAsync("test-0"); + producer.sendAsync("test-1"); + producer.flush(); + + // negative ack the first message + consumer.negativeAcknowledge(consumer.receive()); + // wait for 2s, negative ack the second message + Thread.sleep(2000); + consumer.negativeAcknowledge(consumer.receive()); + + // now 2s has passed, the first message should be redelivered 1s later. + Message msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + } + @Test public void testNegativeAcksWithBatchAckEnabled() throws Exception { cleanup(); diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml index 74007745c70ee..f226ac57588fd 100644 --- a/pulsar-client-all/pom.xml +++ b/pulsar-client-all/pom.xml @@ -200,6 +200,7 @@ org.reactivestreams:reactive-streams org.tukaani:xz org.yaml:snakeyaml + it.unimi.dsi:fastutil com.fasterxml.jackson.core:jackson-annotations diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml index 1093b405731ea..47c6ac835054f 100644 --- a/pulsar-client-shaded/pom.xml +++ b/pulsar-client-shaded/pom.xml @@ -164,6 +164,7 @@ org.reactivestreams:reactive-streams org.tukaani:xz org.yaml:snakeyaml + it.unimi.dsi:fastutil com.fasterxml.jackson.core:jackson-annotations diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 49bb3c6490ae9..e1a70ed074833 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -207,6 +207,16 @@ test + + org.roaringbitmap + RoaringBitmap + + + + it.unimi.dsi + fastutil + + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index d2753856264fc..a7eb89bda157f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2745,7 +2745,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { int messagesFromQueue = 0; Message peek = incomingMessages.peek(); if (peek != null) { - MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId()); + MessageId messageId = NegativeAcksTracker.discardBatchAndPartitionIndex(peek.getMessageId()); if (!messageIds.contains(messageId)) { // first message is not expired, then no message is expired in queue. return 0; @@ -2756,7 +2756,7 @@ private int removeExpiredMessagesFromQueue(Set messageIds) { while (message != null) { decreaseIncomingMessageSize(message); messagesFromQueue++; - MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId()); + MessageId id = NegativeAcksTracker.discardBatchAndPartitionIndex(message.getMessageId()); if (!messageIds.contains(id)) { messageIds.add(id); break; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 5256ebf04f43c..8252f2d5ab8d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -24,48 +24,50 @@ import io.netty.util.Timer; import java.io.Closeable; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import it.unimi.dsi.fastutil.longs.LongBidirectionalIterator; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; +import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; class NegativeAcksTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class); - private ConcurrentLongLongPairHashMap nackedMessages = null; + // timestamp -> ledgerId -> entryId, no need to batch index, if different messages have + // different timestamp, there will be multiple entries in the map + // RB Tree -> LongOpenHashMap -> Roaring64Bitmap + private Long2ObjectSortedMap> nackedMessages = null; private final ConsumerBase consumer; private final Timer timer; - private final long nackDelayNanos; - private final long timerIntervalNanos; + private final long nackDelayMs; private final RedeliveryBackoff negativeAckRedeliveryBackoff; + private final int negativeAckPrecisionBitCnt; private Timeout timeout; // Set a min delay to allow for grouping nacks within a single batch - private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100); - private static final long NON_PARTITIONED_TOPIC_PARTITION_INDEX = Long.MAX_VALUE; + private static final long MIN_NACK_DELAY_MS = 100; + private static final int DUMMY_PARTITION_INDEX = -2; public NegativeAcksTracker(ConsumerBase consumer, ConsumerConfigurationData conf) { this.consumer = consumer; this.timer = consumer.getClient().timer(); - this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()), - MIN_NACK_DELAY_NANOS); + this.nackDelayMs = Math.max(TimeUnit.MICROSECONDS.toMillis(conf.getNegativeAckRedeliveryDelayMicros()), + MIN_NACK_DELAY_MS); this.negativeAckRedeliveryBackoff = conf.getNegativeAckRedeliveryBackoff(); - if (negativeAckRedeliveryBackoff != null) { - this.timerIntervalNanos = Math.max( - TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(0)), - MIN_NACK_DELAY_NANOS) / 3; - } else { - this.timerIntervalNanos = nackDelayNanos / 3; - } + this.negativeAckPrecisionBitCnt = conf.getNegativeAckPrecisionBitCnt(); } private void triggerRedelivery(Timeout t) { @@ -76,21 +78,48 @@ private void triggerRedelivery(Timeout t) { return; } - long now = System.nanoTime(); - nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { - if (timestamp < now) { - MessageId msgId = new MessageIdImpl(ledgerId, entryId, - // need to covert non-partitioned topic partition index to -1 - (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); - addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); - messagesToRedeliver.add(msgId); + long currentTimestamp = System.currentTimeMillis(); + for (long timestamp : nackedMessages.keySet()) { + if (timestamp > currentTimestamp) { + // We are done with all the messages that need to be redelivered + break; + } + + Long2ObjectMap ledgerMap = nackedMessages.get(timestamp); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entrySet = ledgerEntry.getValue(); + entrySet.forEach(entryId -> { + MessageId msgId = new MessageIdImpl(ledgerId, entryId, DUMMY_PARTITION_INDEX); + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); + messagesToRedeliver.add(msgId); + }); + } + } + + // remove entries from the nackedMessages map + LongBidirectionalIterator iterator = nackedMessages.keySet().iterator(); + while (iterator.hasNext()) { + long timestamp = iterator.nextLong(); + if (timestamp <= currentTimestamp) { + iterator.remove(); + } else { + break; + } + } + + // Schedule the next redelivery if there are still messages to redeliver + if (!nackedMessages.isEmpty()) { + long nextTriggerTimestamp = nackedMessages.firstLongKey(); + long delayMs = Math.max(nextTriggerTimestamp - currentTimestamp, 0); + if (delayMs > 0) { + this.timeout = timer.newTimeout(this::triggerRedelivery, delayMs, TimeUnit.MILLISECONDS); + } else { + this.timeout = timer.newTimeout(this::triggerRedelivery, 0, TimeUnit.MILLISECONDS); } - }); - for (MessageId messageId : messagesToRedeliver) { - nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(), - ((MessageIdImpl) messageId).getEntryId()); + } else { + this.timeout = null; } - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages, @@ -110,39 +139,56 @@ public synchronized void add(Message message) { add(message.getMessageId(), message.getRedeliveryCount()); } + static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + private synchronized void add(MessageId messageId, int redeliveryCount) { if (nackedMessages == null) { - nackedMessages = ConcurrentLongLongPairHashMap.newBuilder() - .autoShrink(true) - .concurrencyLevel(1) - .build(); + nackedMessages = new Long2ObjectRBTreeMap<>(); } - long backoffNs; + long backoffMs; if (negativeAckRedeliveryBackoff != null) { - backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + backoffMs = TimeUnit.MILLISECONDS.toMillis(negativeAckRedeliveryBackoff.next(redeliveryCount)); } else { - backoffNs = nackDelayNanos; + backoffMs = nackDelayMs; } - MessageIdAdv messageIdAdv = MessageIdAdvUtils.discardBatch(messageId); - // ConcurrentLongLongPairHashMap requires the key and value >=0. - // partitionIndex is -1 if the message is from a non-partitioned topic, but we don't use - // partitionIndex actually, so we can set it to Long.MAX_VALUE in the case of non-partitioned topic to - // avoid exception from ConcurrentLongLongPairHashMap. - nackedMessages.put(messageIdAdv.getLedgerId(), messageIdAdv.getEntryId(), - messageIdAdv.getPartitionIndex() >= 0 ? messageIdAdv.getPartitionIndex() : - NON_PARTITIONED_TOPIC_PARTITION_INDEX, System.nanoTime() + backoffNs); + MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; + long timestamp = trimLowerBit(System.currentTimeMillis() + backoffMs, negativeAckPrecisionBitCnt); + nackedMessages.computeIfAbsent(timestamp, k -> new Long2ObjectOpenHashMap<>()) + .computeIfAbsent(messageIdAdv.getLedgerId(), k -> new Roaring64Bitmap()) + .add(messageIdAdv.getEntryId()); if (this.timeout == null) { // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for // nack immediately following the current one will be batched into the same redeliver request. - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + this.timeout = timer.newTimeout(this::triggerRedelivery, backoffMs, TimeUnit.MILLISECONDS); } } + /** + * Discard the batch index and partition index from the message id. + * + * @param messageId + * @return + */ + public static MessageIdAdv discardBatchAndPartitionIndex(MessageId messageId) { + if (messageId instanceof ChunkMessageIdImpl) { + return (MessageIdAdv) messageId; + } + MessageIdAdv msgId = (MessageIdAdv) messageId; + return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), DUMMY_PARTITION_INDEX); + } + @VisibleForTesting - Optional getNackedMessagesCount() { - return Optional.ofNullable(nackedMessages).map(ConcurrentLongLongPairHashMap::size); + synchronized long getNackedMessagesCount() { + if (nackedMessages == null) { + return 0; + } + return nackedMessages.values().stream().mapToLong( + ledgerMap -> ledgerMap.values().stream().mapToLong( + Roaring64Bitmap::getLongCardinality).sum()).sum(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index f9ff5913f62da..f430371d37c75 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -155,6 +155,16 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1); + @ApiModelProperty( + name = "negativeAckPrecisionBitCnt", + value = "The redelivery time precision bit count. The lower bits of the redelivery time will be" + + "trimmed to reduce the memory occupation.\nThe default value is 8, which means the" + + "redelivery time will be bucketed by 256ms, the redelivery time could be earlier(no later)" + + "than the expected time, but no more than 256ms. \nIf set to k, the redelivery time will be" + + "bucketed by 2^k ms.\nIf the value is 0, the redelivery time will be accurate to ms." + ) + private int negativeAckPrecisionBitCnt = 8; + @ApiModelProperty( name = "maxTotalReceiverQueueSizeAcrossPartitions", value = "The max total receiver queue size across partitions.\n"