Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import com.atomikos.logging.Logger;
import com.atomikos.logging.LoggerFactory;
Expand All @@ -27,7 +27,7 @@ public abstract class AbstractJdbcConnectionProxy extends DynamicProxySupport<Co

private static final Logger LOGGER = LoggerFactory.createLogger(AbstractJdbcConnectionProxy.class);

private List<Statement> statements = new ArrayList<Statement>();
private Set<Statement> statements = new LinkedHashSet<Statement>();

public AbstractJdbcConnectionProxy(Connection delegate) {
super(delegate);
Expand Down Expand Up @@ -63,15 +63,13 @@ private <S extends Statement> S createProxyStatement(S s) {
public PreparedStatement prepareStatement(String sql) throws SQLException {
updateTransactionContext();
PreparedStatement s = this.delegate.prepareStatement(sql);
addStatement(s);
return createProxyStatement(s);
}

@Proxied
public CallableStatement prepareCall(String sql) throws SQLException {
updateTransactionContext();
CallableStatement s = this.delegate.prepareCall(sql);
addStatement(s);
return createProxyStatement(s);
}

Expand All @@ -80,7 +78,6 @@ public CallableStatement prepareCall(String sql) throws SQLException {
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
updateTransactionContext();
Statement s = this.delegate.createStatement(resultSetType, resultSetConcurrency);
addStatement(s);
return createProxyStatement(s);
}

Expand All @@ -89,15 +86,13 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res
throws SQLException {
updateTransactionContext();
PreparedStatement s = this.delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
addStatement(s);
return createProxyStatement(s);
}

@Proxied
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
updateTransactionContext();
CallableStatement s = this.delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
addStatement(s);
return createProxyStatement(s);
}

Expand All @@ -106,7 +101,6 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in
throws SQLException {
updateTransactionContext();
Statement s = this.delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
addStatement(s);
return createProxyStatement(s);
}

Expand All @@ -115,7 +109,6 @@ public Statement createStatement(int resultSetType, int resultSetConcurrency, in
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
updateTransactionContext();
PreparedStatement s = this.delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
addStatement(s);
return createProxyStatement(s);
}

Expand All @@ -124,23 +117,20 @@ public CallableStatement prepareCall(String sql, int resultSetType, int resultSe
int resultSetHoldability) throws SQLException {
updateTransactionContext();
CallableStatement s = this.delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
addStatement(s);
return createProxyStatement(s);
}

@Proxied
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
updateTransactionContext();
PreparedStatement s = this.delegate.prepareStatement(sql, autoGeneratedKeys);
addStatement(s);
return createProxyStatement(s);
}

@Proxied
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
updateTransactionContext();
PreparedStatement s = this.delegate.prepareStatement(sql, columnIndexes);
addStatement(s);
return createProxyStatement(s);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ public boolean equals(Object other) {
if (other instanceof JdbcRequeueSynchronization) {
JdbcRequeueSynchronization o = (JdbcRequeueSynchronization) other;
ret = this.compositeTransaction.isSameTransaction(o.compositeTransaction);
}
ret = ret && this.proxy.equals(o.proxy);
}
return ret;
}

public int hashCode() {
return compositeTransaction.hashCode();
return compositeTransaction.hashCode() + proxy.hashCode();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;

import com.atomikos.finitestates.FSM;
Expand Down Expand Up @@ -77,7 +79,7 @@ public class CoordinatorImp implements CompositeCoordinator, Participant,

private CoordinatorStateHandler stateHandler_;
private boolean single_threaded_2pc_;
private transient List<Synchronization> synchronizations;
private transient Set<Synchronization> synchronizations;
private boolean timedout = false;

private String recoveryDomainName;
Expand All @@ -95,7 +97,7 @@ protected CoordinatorImp ( String root)
setStateHandler ( new ActiveStateHandler ( this ) );
startThreads ( DEFAULT_MILLIS_BETWEEN_TIMER_WAKEUPS );
single_threaded_2pc_ = false;
synchronizations = new ArrayList<Synchronization>();
synchronizations = new LinkedHashSet<Synchronization>();
}

private void initFsm(TxState initialState) {
Expand Down Expand Up @@ -153,7 +155,7 @@ protected CoordinatorImp ( String recoveryDomainName, String coordinatorId, Stri

setStateHandler ( new ActiveStateHandler ( this ) );
startThreads ( DEFAULT_MILLIS_BETWEEN_TIMER_WAKEUPS );
synchronizations = new ArrayList<Synchronization>();
synchronizations = new LinkedHashSet<Synchronization>();
}

/**
Expand All @@ -166,7 +168,7 @@ public CoordinatorImp ()
initFsm(TxState.ACTIVE );

single_threaded_2pc_ = false;
synchronizations = new ArrayList<Synchronization>();
synchronizations = new LinkedHashSet<Synchronization>();

}

Expand Down Expand Up @@ -406,15 +408,15 @@ private void rememberSychronizationForAfterCompletion(Synchronization sync) {
getSynchronizations().add(sync);
}

private List<Synchronization> getSynchronizations() {
private Set<Synchronization> getSynchronizations() {
synchronized(fsm_) {
if (synchronizations == null) synchronizations = new ArrayList<Synchronization>();
if (synchronizations == null) synchronizations = new LinkedHashSet<Synchronization>();
return synchronizations;
}
}

private List<Synchronization> cloneAndReverseSynchronizationsForAfterCompletion() {
List<Synchronization> src = getSynchronizations();
Set<Synchronization> src = getSynchronizations();
List<Synchronization> ret = new ArrayList<>(src.size());
synchronized(fsm_) {
ret.addAll(src);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.Stack;

import com.atomikos.icatch.CompositeTransaction;
Expand Down Expand Up @@ -38,6 +40,7 @@ abstract class TransactionStateHandler implements SubTxAwareParticipant

private int subtxs_;
private List<Synchronization> synchronizations_; // FIFO - cf case 20711
private transient Set<Synchronization> tempSynchronizations;
private List<SubTxAwareParticipant> subtxawares_;
private CompositeTransactionImp ct_;

Expand All @@ -47,16 +50,22 @@ protected TransactionStateHandler ( CompositeTransactionImp ct )
subtxs_ = 0;
subtxawares_ = new ArrayList<SubTxAwareParticipant>();
synchronizations_ = new Stack<Synchronization>();
tempSynchronizations = new LinkedHashSet<Synchronization>();

}

protected TransactionStateHandler ( CompositeTransactionImp ct ,
TransactionStateHandler handler )
{
subtxs_ = handler.getSubTransactionCount();
synchronizations_ = handler.getSynchronizations();
subtxawares_ = handler.getSubtxawares();
ct_ = ct;

synchronizations_ = new Stack<Synchronization>();
tempSynchronizations = new LinkedHashSet<Synchronization>();
for (final Synchronization sync : handler.getSynchronizations()) {
localPushSynchronization(sync);
}
}

private synchronized void localDecSubTxCount()
Expand All @@ -76,7 +85,10 @@ private synchronized int localGetSubTxCount()

private synchronized void localPushSynchronization ( Synchronization sync )
{
synchronizations_.add ( sync );
if (tempSynchronizations.contains(sync) == false) {
tempSynchronizations.add(sync);
synchronizations_.add ( sync );
}
}

/**
Expand All @@ -85,7 +97,10 @@ private synchronized void localPushSynchronization ( Synchronization sync )
*/
private Synchronization localPopSynchronization() {
Synchronization ret = null;
if (!synchronizations_.isEmpty()) ret = synchronizations_.remove(0);
if (!synchronizations_.isEmpty()) {
ret = synchronizations_.remove(0);
tempSynchronizations.remove(ret);
}
return ret;
}

Expand Down