Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public final class ConfigProperties {
public static final String LOG_LOCK_ACQUISITION_MAX_ATTEMPTS = "com.atomikos.icatch.log_lock_acquisition_max_attempts";
public static final String LOG_LOCK_ACQUISITION_RETRY_DELAY = "com.atomikos.icatch.log_lock_acquisition_retry_delay";


public static final String REST_CLIENT_BUILDER = "com.atomikos.remoting.rest_client_builder";

/**
* Replace ${...} sequence with the referenced value from the given properties or
* (if not found) the system properties -
Expand Down Expand Up @@ -272,6 +273,11 @@ public String getJvmId() {
return getProperty(JVM_ID_PROPERTY_NAME);

}

public String getRestClientBuilder() {
return getProperty(REST_CLIENT_BUILDER);

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -46,6 +47,7 @@

@Path("/atomikos")
@Consumes(HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)
@Produces("text/plain")
public class AtomikosRestPort {

public static final String REST_URL_PROPERTY_NAME = "com.atomikos.icatch.rest_port_url";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.atomikos.remoting.twopc;

import static javax.ws.rs.client.ClientBuilder.newClient;

import javax.ws.rs.client.Client;

/**
* Default provider for standard Jaxrs Client without connection pool
*/
public class DefaultRestClientBuilder extends RestClientBuilder {

public Client build() {
Client client = newClient();
client.property("jersey.config.client.suppressHttpComplianceValidation", true);
client.register(ParticipantsProvider.class);
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.atomikos.icatch.Participant;
import com.atomikos.icatch.RollbackException;
import com.atomikos.icatch.SysException;
import com.atomikos.icatch.config.Configuration;
import com.atomikos.logging.Logger;
import com.atomikos.logging.LoggerFactory;
import com.atomikos.remoting.support.HeaderNames;
Expand All @@ -42,20 +43,29 @@ public class ParticipantAdapter implements Participant {

private static final Logger LOGGER = LoggerFactory.createLogger(ParticipantAdapter.class);

private final WebTarget target;
private static Client client;

private final URI uri;

private final Map<String, Integer> cascadeList = new HashMap<>();

public ParticipantAdapter(URI uri) {
Client client = newClient();
client.property("jersey.config.client.suppressHttpComplianceValidation", true);
client.register(ParticipantsProvider.class);
target = client.target(uri);
if (client == null) {
String className = Configuration.getConfigProperties().getRestClientBuilder();
try {
Class<?> builderClass = Thread.currentThread().getContextClassLoader().loadClass(className);
RestClientBuilder restClientBuilder = (RestClientBuilder)builderClass.newInstance();
client = restClientBuilder.build();
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
this.uri = uri;
}

@Override
public String getURI() {
return target.getUri().toASCIIString();
return uri.toASCIIString();
}

@Override
Expand All @@ -74,7 +84,7 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc
LOGGER.logDebug("Calling prepare on " + getURI());
}
try {
int result = target.request()
int result = client.target(uri).request()
.buildPost(Entity.entity(cascadeList, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON))
.invoke(Integer.class);
if (LOGGER.isTraceEnabled()) {
Expand All @@ -84,9 +94,16 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc
} catch (WebApplicationException e) {
int status = e.getResponse().getStatus();
if (status == 404) {
// 404 writes a String entity - we have to consume it
consumeStringEntity(e.getResponse());
LOGGER.logWarning("Remote participant not available - any remote work will rollback...", e);
throw new RollbackException();
} else {
if (status == 409) {
// 409 writes a String entity - we have to consume it
consumeStringEntity(e.getResponse());
e.getResponse().close();
}
LOGGER.logWarning("Unexpected error during prepare - see stacktrace for more details...", e);
throw new HeurHazardException();
}
Expand All @@ -100,17 +117,21 @@ public void commit(boolean onePhase)
LOGGER.logDebug("Calling commit on " + getURI());
}

Response r = target.path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke();
Response r = client.target(uri).path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke();

if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) {
int status = r.getStatus();
switch (status) {
case 404:
// 404 writes a String entity - we have to consume it
consumeStringEntity(r);
if (onePhase) {
LOGGER.logWarning("Remote participant not available - default outcome will be rollback");
throw new RollbackException();
}
case 409:
// 409 writes a String entity - we have to consume it
consumeStringEntity(r);
LOGGER.logWarning("Unexpected 409 error on commit");
throw new HeurMixedException();
default:
Expand All @@ -127,12 +148,14 @@ public void rollback() throws HeurCommitException, HeurMixedException, HeurHazar
LOGGER.logDebug("Calling rollback on " + getURI());
}

Response r = target.request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete();
Response r = client.target(uri).request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete();

if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) {
int status = r.getStatus();
switch (status) {
case 409:
// 409 writes a String entity - we have to consume it
consumeStringEntity(r);
LOGGER.logWarning("Unexpected 409 error on rollback");
throw new HeurMixedException();
case 404:
Expand Down Expand Up @@ -174,5 +197,15 @@ public int hashCode() {
public String toString() {
return "ParticipantAdapter for: " + getURI();
}

private void consumeStringEntity(Response r) {
// the entity body has to be consumed to allow pooling of http connections.
// see https://stackoverflow.com/questions/27063667/httpclient-4-3-blocking-on-connection-pool
try {
r.readEntity(String.class);
} catch (Exception e) {
// catch exception. we only want to be sure that all content was cosumed
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.atomikos.remoting.twopc;

import javax.ws.rs.client.Client;

/**
* Abstract Builder for creation of Rest Client
*
* You can create a sublcass if you need some special handling like connection polling, timeouts, ...
* Implementation is defined by the property com.atomikos.remoting.rest_client_builder
* Default in transaction-default.properties is
* com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder
*
* Here some example:
* <pre>
*
* public class PooledRestClientBuilder extends RestClientBuilder {

@Override
public Client build() {
ResteasyClientBuilder builder = new ResteasyClientBuilder();

ConfigProperties configProperties = Configuration.getConfigProperties();
String connectionPoolSizeProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectionPoolSize");
int connectionPoolSize = 20;
if (connectionPoolSizeProperty != null)
connectionPoolSize = Integer.valueOf(connectionPoolSizeProperty);

String connectTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectTimeout");
int connectTimeout = 10;
if (connectTimeoutProperty != null)
connectTimeout = Integer.valueOf(connectTimeoutProperty);

String readTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.readTimeout");
int readTimeout = 60;
if (readTimeoutProperty != null)
readTimeout = Integer.valueOf(readTimeoutProperty);

builder.connectTimeout(connectTimeout, TimeUnit.SECONDS);
builder.readTimeout(readTimeout, TimeUnit.SECONDS);
Client c = builder.connectionPoolSize(connectionPoolSize).build();
c.property("jersey.config.client.suppressHttpComplianceValidation", true);
c.register(ParticipantsProvider.class);
return c;
}
}
*
* </pre>
*/
public abstract class RestClientBuilder {

public abstract Client build();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ com.atomikos.icatch.logcloud_datasource_name=logCloudDS
com.atomikos.icatch.throw_on_heuristic=false
com.atomikos.icatch.log_lock_acquisition_max_attempts=3
com.atomikos.icatch.log_lock_acquisition_retry_delay=1000
com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder