diff --git a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AbstractJdbcConnectionProxy.java b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AbstractJdbcConnectionProxy.java index e5fe41a2..edff623f 100644 --- a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AbstractJdbcConnectionProxy.java +++ b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AbstractJdbcConnectionProxy.java @@ -14,9 +14,9 @@ import java.sql.SQLException; import java.sql.Savepoint; import java.sql.Statement; -import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.Iterator; -import java.util.List; +import java.util.Set; import com.atomikos.logging.Logger; import com.atomikos.logging.LoggerFactory; @@ -27,7 +27,7 @@ public abstract class AbstractJdbcConnectionProxy extends DynamicProxySupport statements = new ArrayList(); + private Set statements = new LinkedHashSet(); public AbstractJdbcConnectionProxy(Connection delegate) { super(delegate); @@ -63,7 +63,6 @@ private S createProxyStatement(S s) { public PreparedStatement prepareStatement(String sql) throws SQLException { updateTransactionContext(); PreparedStatement s = this.delegate.prepareStatement(sql); - addStatement(s); return createProxyStatement(s); } @@ -71,7 +70,6 @@ public PreparedStatement prepareStatement(String sql) throws SQLException { public CallableStatement prepareCall(String sql) throws SQLException { updateTransactionContext(); CallableStatement s = this.delegate.prepareCall(sql); - addStatement(s); return createProxyStatement(s); } @@ -80,7 +78,6 @@ public CallableStatement prepareCall(String sql) throws SQLException { public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { updateTransactionContext(); Statement s = this.delegate.createStatement(resultSetType, resultSetConcurrency); - addStatement(s); return createProxyStatement(s); } @@ -89,7 +86,6 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res throws SQLException { updateTransactionContext(); PreparedStatement s = this.delegate.prepareStatement(sql, resultSetType, resultSetConcurrency); - addStatement(s); return createProxyStatement(s); } @@ -97,7 +93,6 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { updateTransactionContext(); CallableStatement s = this.delegate.prepareCall(sql, resultSetType, resultSetConcurrency); - addStatement(s); return createProxyStatement(s); } @@ -106,7 +101,6 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in throws SQLException { updateTransactionContext(); Statement s = this.delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); - addStatement(s); return createProxyStatement(s); } @@ -115,7 +109,6 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { updateTransactionContext(); PreparedStatement s = this.delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); - addStatement(s); return createProxyStatement(s); } @@ -124,7 +117,6 @@ public CallableStatement prepareCall(String sql, int resultSetType, int resultSe int resultSetHoldability) throws SQLException { updateTransactionContext(); CallableStatement s = this.delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); - addStatement(s); return createProxyStatement(s); } @@ -132,7 +124,6 @@ public CallableStatement prepareCall(String sql, int resultSetType, int resultSe public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { updateTransactionContext(); PreparedStatement s = this.delegate.prepareStatement(sql, autoGeneratedKeys); - addStatement(s); return createProxyStatement(s); } @@ -140,7 +131,6 @@ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) thr public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { updateTransactionContext(); PreparedStatement s = this.delegate.prepareStatement(sql, columnIndexes); - addStatement(s); return createProxyStatement(s); } diff --git a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java index bcb0b12f..3de12c19 100644 --- a/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java +++ b/public/transactions-jdbc/src/main/java/com/atomikos/jdbc/internal/AtomikosJdbcConnectionProxy.java @@ -166,12 +166,13 @@ public boolean equals(Object other) { if (other instanceof JdbcRequeueSynchronization) { JdbcRequeueSynchronization o = (JdbcRequeueSynchronization) other; ret = this.compositeTransaction.isSameTransaction(o.compositeTransaction); - } + ret = ret && this.proxy.equals(o.proxy); + } return ret; } public int hashCode() { - return compositeTransaction.hashCode(); + return compositeTransaction.hashCode() + proxy.hashCode(); } } diff --git a/public/transactions/src/main/java/com/atomikos/icatch/imp/CoordinatorImp.java b/public/transactions/src/main/java/com/atomikos/icatch/imp/CoordinatorImp.java index 7264b48e..1c13c3fb 100644 --- a/public/transactions/src/main/java/com/atomikos/icatch/imp/CoordinatorImp.java +++ b/public/transactions/src/main/java/com/atomikos/icatch/imp/CoordinatorImp.java @@ -10,8 +10,10 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Vector; import com.atomikos.finitestates.FSM; @@ -77,7 +79,7 @@ public class CoordinatorImp implements CompositeCoordinator, Participant, private CoordinatorStateHandler stateHandler_; private boolean single_threaded_2pc_; - private transient List synchronizations; + private transient Set synchronizations; private boolean timedout = false; private String recoveryDomainName; @@ -95,7 +97,7 @@ protected CoordinatorImp ( String root) setStateHandler ( new ActiveStateHandler ( this ) ); startThreads ( DEFAULT_MILLIS_BETWEEN_TIMER_WAKEUPS ); single_threaded_2pc_ = false; - synchronizations = new ArrayList(); + synchronizations = new LinkedHashSet(); } private void initFsm(TxState initialState) { @@ -153,7 +155,7 @@ protected CoordinatorImp ( String recoveryDomainName, String coordinatorId, Stri setStateHandler ( new ActiveStateHandler ( this ) ); startThreads ( DEFAULT_MILLIS_BETWEEN_TIMER_WAKEUPS ); - synchronizations = new ArrayList(); + synchronizations = new LinkedHashSet(); } /** @@ -166,7 +168,7 @@ public CoordinatorImp () initFsm(TxState.ACTIVE ); single_threaded_2pc_ = false; - synchronizations = new ArrayList(); + synchronizations = new LinkedHashSet(); } @@ -406,15 +408,15 @@ private void rememberSychronizationForAfterCompletion(Synchronization sync) { getSynchronizations().add(sync); } - private List getSynchronizations() { + private Set getSynchronizations() { synchronized(fsm_) { - if (synchronizations == null) synchronizations = new ArrayList(); + if (synchronizations == null) synchronizations = new LinkedHashSet(); return synchronizations; } } private List cloneAndReverseSynchronizationsForAfterCompletion() { - List src = getSynchronizations(); + Set src = getSynchronizations(); List ret = new ArrayList<>(src.size()); synchronized(fsm_) { ret.addAll(src); diff --git a/public/transactions/src/main/java/com/atomikos/icatch/imp/TransactionStateHandler.java b/public/transactions/src/main/java/com/atomikos/icatch/imp/TransactionStateHandler.java index af1671f2..a0d3963a 100644 --- a/public/transactions/src/main/java/com/atomikos/icatch/imp/TransactionStateHandler.java +++ b/public/transactions/src/main/java/com/atomikos/icatch/imp/TransactionStateHandler.java @@ -10,7 +10,9 @@ import java.util.ArrayList; import java.util.Enumeration; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; import java.util.Stack; import com.atomikos.icatch.CompositeTransaction; @@ -38,6 +40,7 @@ abstract class TransactionStateHandler implements SubTxAwareParticipant private int subtxs_; private List synchronizations_; // FIFO - cf case 20711 + private transient Set tempSynchronizations; private List subtxawares_; private CompositeTransactionImp ct_; @@ -47,16 +50,22 @@ protected TransactionStateHandler ( CompositeTransactionImp ct ) subtxs_ = 0; subtxawares_ = new ArrayList(); synchronizations_ = new Stack(); + tempSynchronizations = new LinkedHashSet(); + } protected TransactionStateHandler ( CompositeTransactionImp ct , TransactionStateHandler handler ) { subtxs_ = handler.getSubTransactionCount(); - synchronizations_ = handler.getSynchronizations(); subtxawares_ = handler.getSubtxawares(); ct_ = ct; + synchronizations_ = new Stack(); + tempSynchronizations = new LinkedHashSet(); + for (final Synchronization sync : handler.getSynchronizations()) { + localPushSynchronization(sync); + } } private synchronized void localDecSubTxCount() @@ -76,7 +85,10 @@ private synchronized int localGetSubTxCount() private synchronized void localPushSynchronization ( Synchronization sync ) { - synchronizations_.add ( sync ); + if (tempSynchronizations.contains(sync) == false) { + tempSynchronizations.add(sync); + synchronizations_.add ( sync ); + } } /** @@ -85,7 +97,10 @@ private synchronized void localPushSynchronization ( Synchronization sync ) */ private Synchronization localPopSynchronization() { Synchronization ret = null; - if (!synchronizations_.isEmpty()) ret = synchronizations_.remove(0); + if (!synchronizations_.isEmpty()) { + ret = synchronizations_.remove(0); + tempSynchronizations.remove(ret); + } return ret; }