Skip to content

Commit

Permalink
[fix][client] Fix wrong start message id when it's a chunked message …
Browse files Browse the repository at this point in the history
…id (#23713)
  • Loading branch information
BewareMyPower authored Dec 13, 2024
1 parent 8e80f88 commit 4606385
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,12 @@ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.memoryLimit(10000L, SizeUnit.BYTES);
}

interface ThrowingBiConsumer<T, U> {
void accept(T t, U u) throws Exception;
}

@Test
public void testSeekChunkMessages() throws PulsarClientException {
public void testSeekChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
this.conf.setMaxMessageSize(50);
final int totalMessages = 5;
Expand Down Expand Up @@ -612,14 +616,17 @@ public void testSeekChunkMessages() throws PulsarClientException {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.startMessageIdInclusive()
.startMessageId(msgIds.get(1))
.create();

Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(1), readMsg.getMessageId());
ThrowingBiConsumer<Boolean, MessageId> assertStartMessageId = (inclusive, expectedFirstMsgId) -> {
final var builder = pulsarClient.newReader().topic(topicName).startMessageId(msgIds.get(1));
if (inclusive) {
builder.startMessageIdInclusive();
}
@Cleanup final var reader = builder.create();
final var readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(expectedFirstMsgId, readMsg.getMessageId());
};
assertStartMessageId.accept(true, msgIds.get(1));
assertStartMessageId.accept(false, msgIds.get(2));

consumer1.close();
consumer2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,13 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
this.startMessageId = (firstChunkMessageId == null) ? (MessageIdAdv) startMessageId : firstChunkMessageId;
if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
// The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
// startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
this.startMessageId = firstChunkMessageId;
} else {
this.startMessageId = (MessageIdAdv) startMessageId;
}
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
Expand Down

0 comments on commit 4606385

Please sign in to comment.