From 74be3fd4917a2327f2da9b5b55cc572b3c1f4e84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E5=90=9B?= Date: Thu, 29 Feb 2024 21:22:03 +0800 Subject: [PATCH] [fix][txn]Fix TopicTransactionBuffer potential thread safety issue (#22149) --- .../buffer/impl/TopicTransactionBuffer.java | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 5392e473947e6..a36216bd6258b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -170,13 +170,15 @@ public void handleTxnEntry(Entry entry) { if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()); PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); - if (Markers.isTxnMarker(msgMetadata)) { - if (Markers.isTxnAbortMarker(msgMetadata)) { - snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position); + synchronized (TopicTransactionBuffer.this) { + if (Markers.isTxnMarker(msgMetadata)) { + if (Markers.isTxnAbortMarker(msgMetadata)) { + snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position); + } + updateMaxReadPosition(txnID); + } else { + handleTransactionMessage(txnID, position); } - updateMaxReadPosition(txnID); - } else { - handleTransactionMessage(txnID, position); } } } @@ -362,10 +364,10 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { updateMaxReadPosition(txnID); snapshotAbortedTxnProcessor.trimExpiredAbortedTxns(); takeSnapshotByChangeTimes(); + txnAbortedCounter.increment(); + completableFuture.complete(null); + handleLowWaterMark(txnID, lowWaterMark); } - txnAbortedCounter.increment(); - completableFuture.complete(null); - handleLowWaterMark(txnID, lowWaterMark); } @Override @@ -473,7 +475,7 @@ public CompletableFuture closeAsync() { } @Override - public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { + public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) { return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID); } @@ -510,9 +512,11 @@ public PositionImpl getMaxReadPosition() { @Override public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) { TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats(); - transactionInBufferStats.aborted = isTxnAborted(txnID, null); - if (ongoingTxns.containsKey(txnID)) { - transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString(); + synchronized (this) { + transactionInBufferStats.aborted = isTxnAborted(txnID, null); + if (ongoingTxns.containsKey(txnID)) { + transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString(); + } } return transactionInBufferStats; }