Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][txn]Fix TopicTransactionBuffer potential thread safety issue (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored Feb 29, 2024
1 parent e25c7f0 commit 74be3fd
Showing 1 changed file with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,15 @@ public void handleTxnEntry(Entry entry) {
if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (Markers.isTxnMarker(msgMetadata)) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
synchronized (TopicTransactionBuffer.this) {
if (Markers.isTxnMarker(msgMetadata)) {
if (Markers.isTxnAbortMarker(msgMetadata)) {
snapshotAbortedTxnProcessor.putAbortedTxnAndPosition(txnID, position);
}
updateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
updateMaxReadPosition(txnID);
} else {
handleTransactionMessage(txnID, position);
}
}
}
Expand Down Expand Up @@ -362,10 +364,10 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
updateMaxReadPosition(txnID);
snapshotAbortedTxnProcessor.trimExpiredAbortedTxns();
takeSnapshotByChangeTimes();
txnAbortedCounter.increment();
completableFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
}
txnAbortedCounter.increment();
completableFuture.complete(null);
handleLowWaterMark(txnID, lowWaterMark);
}

@Override
Expand Down Expand Up @@ -473,7 +475,7 @@ public CompletableFuture<Void> closeAsync() {
}

@Override
public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {
return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
}

Expand Down Expand Up @@ -510,9 +512,11 @@ public PositionImpl getMaxReadPosition() {
@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats();
transactionInBufferStats.aborted = isTxnAborted(txnID, null);
if (ongoingTxns.containsKey(txnID)) {
transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString();
synchronized (this) {
transactionInBufferStats.aborted = isTxnAborted(txnID, null);
if (ongoingTxns.containsKey(txnID)) {
transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString();
}
}
return transactionInBufferStats;
}
Expand Down

0 comments on commit 74be3fd

Please sign in to comment.