Skip to content

Commit 6df5fa5

Browse files
committed
[fix][broker] Fix NonDurable Subscription msgBackLog incorrect after topic unload
1 parent 9012422 commit 6df5fa5

File tree

7 files changed

+71
-12
lines changed

7 files changed

+71
-12
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
12321232
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
12331233
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
12341234
subscribe.getSubscriptionPropertiesList());
1235+
final boolean resetIncludeHead = subscribe.isResetIncludeHead();
12351236

12361237
if (log.isDebugEnabled()) {
12371238
log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
@@ -1356,6 +1357,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
13561357
.subscriptionProperties(subscriptionProperties)
13571358
.consumerEpoch(consumerEpoch)
13581359
.schemaType(schema == null ? null : schema.getType())
1360+
.resetIncludeHead(resetIncludeHead)
13591361
.build();
13601362
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
13611363
return ignoreUnrecoverableBKException

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class SubscriptionOption {
5151
private Optional<Map<String, String>> subscriptionProperties;
5252
private long consumerEpoch;
5353
private SchemaType schemaType;
54+
private boolean resetIncludeHead;
5455

5556
public static Optional<Map<String, String>> getPropertiesMap(List<KeyValue> list) {
5657
if (list == null) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
887887
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
888888
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
889889
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
890-
option.getConsumerEpoch(), option.getSchemaType());
890+
option.getConsumerEpoch(), option.getSchemaType(), option.isResetIncludeHead());
891891
}
892892

893893
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
@@ -901,7 +901,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
901901
KeySharedMeta keySharedMeta,
902902
Map<String, String> subscriptionProperties,
903903
long consumerEpoch,
904-
SchemaType schemaType) {
904+
SchemaType schemaType,
905+
boolean resetIncludeHead) {
905906
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
906907
return FutureUtil.failedFuture(new NotAllowedException(
907908
"readCompacted only allowed on failover or exclusive subscriptions"));
@@ -984,7 +985,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
984985
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
985986
replicatedSubscriptionState, subscriptionProperties)
986987
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
987-
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);
988+
startMessageRollbackDurationSec, readCompacted, subscriptionProperties, resetIncludeHead);
988989

989990
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
990991
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
@@ -1068,7 +1069,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
10681069
KeySharedMeta keySharedMeta) {
10691070
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
10701071
isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
1071-
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null);
1072+
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null, false);
10721073
}
10731074

10741075
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
@@ -1134,7 +1135,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
11341135

11351136
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
11361137
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
1137-
boolean isReadCompacted, Map<String, String> subscriptionProperties) {
1138+
boolean isReadCompacted, Map<String, String> subscriptionProperties, boolean resetIncludeHead) {
11381139
log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}",
11391140
topic, subscriptionName, startMessageId, subscriptionProperties);
11401141

@@ -1157,7 +1158,8 @@ private CompletableFuture<? extends Subscription> getNonDurableSubscription(Stri
11571158
long entryId = msgId.getEntryId();
11581159
// Ensure that the start message id starts from a valid entry.
11591160
if (ledgerId >= 0 && entryId >= 0
1160-
&& msgId instanceof BatchMessageIdImpl) {
1161+
&& msgId instanceof BatchMessageIdImpl
1162+
&& (msgId.getBatchIndex() >= 0 || resetIncludeHead)) {
11611163
// When the start message is relative to a batch, we need to take one step back on the previous
11621164
// message,
11631165
// because the "batch" might not have been consumed in its entirety.

pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,57 @@ public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType
256256

257257
}
258258

259+
@Test
260+
public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception {
261+
String topicName = "persistent://my-property/my-ns/nonDurable-sub-test";
262+
String subName = "test-sub";
263+
264+
admin.topics().createNonPartitionedTopic(topicName);
265+
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
266+
267+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
268+
.topic(topicName)
269+
.subscriptionName(subName)
270+
.subscriptionMode(SubscriptionMode.NonDurable).subscribe();
271+
272+
// 1. send message
273+
for (int i = 0; i < 10; i++) {
274+
String message = "my-message-" + i;
275+
producer.send(message.getBytes());
276+
}
277+
producer.close();
278+
279+
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 10);
280+
281+
// 2. receive the message
282+
Thread t = new Thread(() -> {
283+
while (true) {
284+
Message<byte[]> msg;
285+
try {
286+
msg = consumer.receive();
287+
consumer.acknowledge(msg);
288+
} catch (PulsarClientException e) {
289+
throw new RuntimeException(e);
290+
}
291+
}
292+
});
293+
t.start();
294+
295+
// 3. consumed all messages and the msgBacklog is 0
296+
Awaitility.await().untilAsserted(() ->
297+
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0));
298+
299+
// 4. unload the topic
300+
admin.topics().unload(topicName);
301+
302+
// 5. wait the consumer reconnect
303+
Awaitility.await().until(() -> admin.topics().getStats(topicName).getSubscriptions() != null);
304+
305+
// 6. the backlog should still be 0
306+
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).
307+
getSubscriptions().get(subName).getMsgBacklog(), 0));
308+
}
309+
259310
@Test
260311
public void testFlowCountForMultiTopics() throws Exception {
261312
String topicName = "persistent://my-property/my-ns/test-flow-count";
@@ -464,7 +515,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception {
464515
// A middle ledger id, and entry id is "-1".
465516
log.info("start test s8");
466517
String s8 = "s8";
467-
MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), 0, -1);
518+
MessageIdImpl startMessageId8 = new MessageIdImpl(ledgers.get(2), -1, -1);
468519
Reader<String> reader8 = pulsarClient.newReader(Schema.STRING).topic(topicName).subscriptionName(s8)
469520
.receiverQueueSize(0).startMessageId(startMessageId8).create();
470521
ManagedLedgerInternalStats.CursorStats cursor8 = admin.topics().getInternalStats(topicName).cursors.get(s8);
@@ -497,7 +548,7 @@ public void testInitReaderAtSpecifiedPosition() throws Exception {
497548
ManagedLedgerInternalStats.CursorStats cursor10 = admin.topics().getInternalStats(topicName).cursors.get(s10);
498549
log.info("cursor10 readPosition: {}, markDeletedPosition: {}", cursor10.readPosition, cursor10.markDeletePosition);
499550
Position p10 = parseReadPosition(cursor10);
500-
assertEquals(p10.getLedgerId(), ledgers.get(2));
551+
assertEquals(p10.getLedgerId(), ledgers.get(3));
501552
assertEquals(p10.getEntryId(), 0);
502553
reader10.close();
503554

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,7 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
895895
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
896896
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
897897
// Use the current epoch to subscribe.
898-
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
898+
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this), resetIncludeHead);
899899

900900
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
901901
synchronized (ConsumerImpl.this) {

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -588,15 +588,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
588588
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
589589
isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition,
590590
startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null,
591-
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH);
591+
Collections.emptyMap(), DEFAULT_CONSUMER_EPOCH, false);
592592
}
593593

594594
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
595595
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
596596
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
597597
InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec,
598598
SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy,
599-
Map<String, String> subscriptionProperties, long consumerEpoch) {
599+
Map<String, String> subscriptionProperties, long consumerEpoch, boolean resetIncludeHead) {
600600
BaseCommand cmd = localCmd(Type.SUBSCRIBE);
601601
CommandSubscribe subscribe = cmd.setSubscribe()
602602
.setTopic(topic)
@@ -611,7 +611,8 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
611611
.setInitialPosition(subscriptionInitialPosition)
612612
.setReplicateSubscriptionState(isReplicated)
613613
.setForceTopicCreation(createTopicIfDoesNotExist)
614-
.setConsumerEpoch(consumerEpoch);
614+
.setConsumerEpoch(consumerEpoch)
615+
.setResetIncludeHead(resetIncludeHead);
615616

616617
if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) {
617618
List<KeyValue> keyValues = new ArrayList<>();

pulsar-common/src/main/proto/PulsarApi.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,8 @@ message CommandSubscribe {
401401

402402
// The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch
403403
optional uint64 consumer_epoch = 19;
404+
405+
optional bool reset_include_head = 20 [default = false];
404406
}
405407

406408
message CommandPartitionedTopicMetadata {

0 commit comments

Comments
 (0)