Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] reader stuck with marker message though hasMessageAvailable() return true #34

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ public NonRecoverableLedgerException(String msg) {
}
}

public static class NoValidEntryLedgerException extends ManagedLedgerException {
public NoValidEntryLedgerException(String msg) {
super(msg);
}
}

public static class LedgerNotExistException extends NonRecoverableLedgerException {
public LedgerNotExistException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -68,6 +69,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -91,6 +93,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand Down Expand Up @@ -2190,6 +2193,78 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
}
}

private boolean isValidEntry(MessageMetadata metadata, PersistentTopic persistentTopic) {
if (metadata.hasMarkerType()) {
return false;
}
if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits() && persistentTopic.isTxnAborted(
new TxnID(metadata.getTxnidMostBits(), metadata.getTxnidLeastBits()), null)) {
return false;
}
return true;
}

/**
* Get the metadata and position of the largest batch index of the last valid entry.
* some entry need to be filtered, such as marker message.
* @param entryMetaDataFuture
* @param lastPosition
* @param ml
*/
private void readLastValidEntry(CompletableFuture<MessageMetadata> entryMetaDataFuture,
CompletableFuture<PositionImpl> lastPositionFuture,
PositionImpl lastPosition, ManagedLedgerImpl ml,
PersistentTopic persistentTopic) {
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
if (isValidEntry(metadata, persistentTopic)) {
entryMetaDataFuture.complete(metadata);
lastPositionFuture.complete(lastPosition);
} else {
// check the previous entry
if (lastPosition.getEntryId() > 0) {
readLastValidEntry(entryMetaDataFuture, lastPositionFuture, PositionImpl.get(
lastPosition.getLedgerId(), lastPosition.getEntryId() - 1), ml, persistentTopic);
} else {
// find out the previous ledger
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgersMap =
ml.getLedgersInfo();
Long previousLedgerId = ledgersMap.lowerKey(lastPosition.getLedgerId());
if (previousLedgerId != null) {
readLastValidEntry(entryMetaDataFuture, lastPositionFuture,
PositionImpl.get(previousLedgerId, ledgersMap.
get(previousLedgerId).getEntries() - 1), ml, persistentTopic);
} else {
entryMetaDataFuture.completeExceptionally(
new ManagedLedgerException.NoValidEntryLedgerException("No valid entry found"));
lastPositionFuture.completeExceptionally(
new ManagedLedgerException.NoValidEntryLedgerException("No valid entry found"));
}
}
}
} finally {
if (entry != null) {
entry.release();
}
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryMetaDataFuture.completeExceptionally(exception);
}

@Override
public String toString() {
return String.format("ServerCnx [%s] get largest batch index when possible",
ServerCnx.this.ctx.channel());
}
}, null);
}

private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl lastPosition,
Expand Down Expand Up @@ -2221,55 +2296,41 @@ private void getLargestBatchIndexWhenPossible(
}

// For a valid position, we read the entry out and parse the batch size from its metadata.
CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
entryFuture.complete(entry);
CompletableFuture<MessageMetadata> entryMetaDataFuture = new CompletableFuture<>();
CompletableFuture<PositionImpl> lastPositionFuture = new CompletableFuture<>();
readLastValidEntry(entryMetaDataFuture, lastPositionFuture, lastPosition, ml, persistentTopic);

entryMetaDataFuture.thenCombine(lastPositionFuture, (metadata, position) -> {
int largestBatchIndex = -1;
if (metadata.hasNumMessagesInBatch()) {
largestBatchIndex = metadata.getNumMessagesInBatch() - 1;
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
entryFuture.completeExceptionally(exception);
}

@Override
public String toString() {
return String.format("ServerCnx [%s] get largest batch index when possible",
ServerCnx.this.ctx.channel());
if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, lastPosition, partitionIndex);
}
}, null);

CompletableFuture<Integer> batchSizeFuture = entryFuture.thenApply(entry -> {
MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
int batchSize = metadata.getNumMessagesInBatch();
entry.release();
return metadata.hasNumMessagesInBatch() ? batchSize : -1;
});

batchSizeFuture.whenComplete((batchSize, e) -> {
if (e != null) {
if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to get batch size for entry " + e.getMessage()));
}
} else {
int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
topic.getName(), subscriptionName, lastPosition, partitionIndex);
}

writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(),
lastPosition.getEntryId(), partitionIndex, largestBatchIndex,
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, position.getLedgerId(),
position.getEntryId(), partitionIndex, largestBatchIndex,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
return null;
}).exceptionally(ex1 -> {
if (ex1.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
handleLastMessageIdFromCompactionService(persistentTopic, requestId, partitionIndex,
markDeletePosition);
} else if (ex1.getCause() instanceof ManagedLedgerException.NoValidEntryLedgerException) {
// return MessageId.earliest to indicate that there is no valid messages to be read
MessageIdImpl messageId = (MessageIdImpl) MessageId.earliest;
writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex, -1,
markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
} else {
writeAndFlush(Commands.newError(
requestId, ServerError.MetadataError,
"Failed to get batch size for entry " + ex1.getMessage()));
}
return null;
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -1908,4 +1909,65 @@ public void testReadCommittedWithCompaction() throws Exception{
Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}

@Test
public void testReadCommitMarkerStuck() throws Exception{
final String namespace = "tnx/ns-commit-marker-stuck";
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K1").value("V1").send();
txn.commit().get();

@Cleanup
Reader<String> reader = this.pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
List<String> result = new ArrayList<>();
while (reader.hasMessageAvailable()) {
Message<String> receive = reader.readNext(2, TimeUnit.SECONDS);
// we pass the hasMessageAvailable check, but the readNext return null or stuck
assertNotEquals(receive, null);
result.add(receive.getValue());
}
}

@Test
public void testReadAbortMarkerStuck() throws Exception{
final String namespace = "tnx/ns-abort-marker-stuck";
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K1").value("V1").send();
txn.abort().get();

@Cleanup
Reader<String> reader = this.pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
List<String> result = new ArrayList<>();
while (reader.hasMessageAvailable()) {
Message<String> receive = reader.readNext(2, TimeUnit.SECONDS);
// we pass the hasMessageAvailable check, but the readNext return null or stuck
assertNotEquals(receive, null);
result.add(receive.getValue());
}
}
}
Loading