Skip to content

Commit c93da6e

Browse files
BewareMyPowerlhotari
authored andcommitted
[fix][client] Fix wrong start message id when it's a chunked message id (#23713)
(cherry picked from commit 4606385)
1 parent 78274a7 commit c93da6e

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -543,8 +543,12 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
543543
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
544544
}
545545

546+
interface ThrowingBiConsumer<T, U> {
547+
void accept(T t, U u) throws Exception;
548+
}
549+
546550
@Test
547-
public void testSeekChunkMessages() throws PulsarClientException {
551+
public void testSeekChunkMessages() throws Exception {
548552
log.info("-- Starting {} test --", methodName);
549553
this.conf.setMaxMessageSize(50);
550554
final int totalMessages = 5;
@@ -594,14 +598,17 @@ public void testSeekChunkMessages() throws PulsarClientException {
594598
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
595599
}
596600

597-
Reader<byte[]> reader = pulsarClient.newReader()
598-
.topic(topicName)
599-
.startMessageIdInclusive()
600-
.startMessageId(msgIds.get(1))
601-
.create();
602-
603-
Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
604-
assertEquals(msgIds.get(1), readMsg.getMessageId());
601+
ThrowingBiConsumer<Boolean, MessageId> assertStartMessageId = (inclusive, expectedFirstMsgId) -> {
602+
final var builder = pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
603+
if (inclusive) {
604+
builder.startMessageIdInclusive();
605+
}
606+
@Cleanup final var reader = builder.create();
607+
final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
608+
assertEquals(expectedFirstMsgId, readMsg.getMessageId());
609+
};
610+
assertStartMessageId.accept(true, msgIds.get(1));
611+
assertStartMessageId.accept(false, msgIds.get(2));
605612

606613
consumer1.close();
607614
consumer2.close();

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,13 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
276276
this.subscriptionMode = conf.getSubscriptionMode();
277277
if (startMessageId != null) {
278278
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
279-
this.startMessageId = (firstChunkMessageId == null) ? (MessageIdAdv) startMessageId : firstChunkMessageId;
279+
if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
280+
// The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
281+
// startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
282+
this.startMessageId = firstChunkMessageId;
283+
} else {
284+
this.startMessageId = (MessageIdAdv) startMessageId;
285+
}
280286
}
281287
this.initialStartMessageId = this.startMessageId;
282288
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;

0 commit comments

Comments
 (0)