Skip to content

Commit

Permalink
https://github.com/atomikos/transactions-essentials/issues/213
Browse files Browse the repository at this point in the history
Builder for Rest Client of remote-transactions
  • Loading branch information
martinaubele committed Jun 14, 2024
1 parent 115834e commit f7990c2
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public final class ConfigProperties {
public static final String LOG_LOCK_ACQUISITION_MAX_ATTEMPTS = "com.atomikos.icatch.log_lock_acquisition_max_attempts";
public static final String LOG_LOCK_ACQUISITION_RETRY_DELAY = "com.atomikos.icatch.log_lock_acquisition_retry_delay";


public static final String REST_CLIENT_BUILDER = "com.atomikos.remoting.rest_client_builder";

/**
* Replace ${...} sequence with the referenced value from the given properties or
* (if not found) the system properties -
Expand Down Expand Up @@ -272,6 +273,11 @@ public String getJvmId() {
return getProperty(JVM_ID_PROPERTY_NAME);

}

public String getRestClientBuilder() {
return getProperty(REST_CLIENT_BUILDER);

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -46,6 +47,7 @@

@Path("/atomikos")
@Consumes(HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)
@Produces("text/plain")
public class AtomikosRestPort {

public static final String REST_URL_PROPERTY_NAME = "com.atomikos.icatch.rest_port_url";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.atomikos.remoting.twopc;

import static javax.ws.rs.client.ClientBuilder.newClient;

import javax.ws.rs.client.Client;

/**
* Default provider for standard Jaxrs Client without connection pool
*/
public class DefaultRestClientBuilder extends RestClientBuilder {

public Client build() {
Client client = newClient();
client.property("jersey.config.client.suppressHttpComplianceValidation", true);
client.register(ParticipantsProvider.class);
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.atomikos.icatch.Participant;
import com.atomikos.icatch.RollbackException;
import com.atomikos.icatch.SysException;
import com.atomikos.icatch.config.Configuration;
import com.atomikos.logging.Logger;
import com.atomikos.logging.LoggerFactory;
import com.atomikos.remoting.support.HeaderNames;
Expand All @@ -42,20 +43,29 @@ public class ParticipantAdapter implements Participant {

private static final Logger LOGGER = LoggerFactory.createLogger(ParticipantAdapter.class);

private final WebTarget target;
private static Client client;

private final URI uri;

private final Map<String, Integer> cascadeList = new HashMap<>();

public ParticipantAdapter(URI uri) {
Client client = newClient();
client.property("jersey.config.client.suppressHttpComplianceValidation", true);
client.register(ParticipantsProvider.class);
target = client.target(uri);
if (client == null) {
String className = Configuration.getConfigProperties().getRestClientBuilder();
try {
Class<?> builderClass = Thread.currentThread().getContextClassLoader().loadClass(className);
RestClientBuilder restClientBuilder = (RestClientBuilder)builderClass.newInstance();
client = restClientBuilder.build();
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
this.uri = uri;
}

@Override
public String getURI() {
return target.getUri().toASCIIString();
return uri.toASCIIString();
}

@Override
Expand All @@ -74,7 +84,7 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc
LOGGER.logDebug("Calling prepare on " + getURI());
}
try {
int result = target.request()
int result = client.target(uri).request()
.buildPost(Entity.entity(cascadeList, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON))
.invoke(Integer.class);
if (LOGGER.isTraceEnabled()) {
Expand All @@ -84,9 +94,16 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc
} catch (WebApplicationException e) {
int status = e.getResponse().getStatus();
if (status == 404) {
// 404 writes a String entity - we have to consume it
consumeStringEntity(e.getResponse());
LOGGER.logWarning("Remote participant not available - any remote work will rollback...", e);
throw new RollbackException();
} else {
if (status == 409) {
// 409 writes a String entity - we have to consume it
consumeStringEntity(e.getResponse());
e.getResponse().close();
}
LOGGER.logWarning("Unexpected error during prepare - see stacktrace for more details...", e);
throw new HeurHazardException();
}
Expand All @@ -100,17 +117,21 @@ public void commit(boolean onePhase)
LOGGER.logDebug("Calling commit on " + getURI());
}

Response r = target.path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke();
Response r = client.target(uri).path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke();

if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) {
int status = r.getStatus();
switch (status) {
case 404:
// 404 writes a String entity - we have to consume it
consumeStringEntity(r);
if (onePhase) {
LOGGER.logWarning("Remote participant not available - default outcome will be rollback");
throw new RollbackException();
}
case 409:
// 409 writes a String entity - we have to consume it
consumeStringEntity(r);
LOGGER.logWarning("Unexpected 409 error on commit");
throw new HeurMixedException();
default:
Expand All @@ -127,12 +148,14 @@ public void rollback() throws HeurCommitException, HeurMixedException, HeurHazar
LOGGER.logDebug("Calling rollback on " + getURI());
}

Response r = target.request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete();
Response r = client.target(uri).request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete();

if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) {
int status = r.getStatus();
switch (status) {
case 409:
// 409 writes a String entity - we have to consume it
consumeStringEntity(r);
LOGGER.logWarning("Unexpected 409 error on rollback");
throw new HeurMixedException();
case 404:
Expand Down Expand Up @@ -174,5 +197,15 @@ public int hashCode() {
public String toString() {
return "ParticipantAdapter for: " + getURI();
}

private void consumeStringEntity(Response r) {
// the entity body has to be consumed to allow pooling of http connections.
// see https://stackoverflow.com/questions/27063667/httpclient-4-3-blocking-on-connection-pool
try {
r.readEntity(String.class);
} catch (Exception e) {
// catch exception. we only want to be sure that all content was cosumed
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.atomikos.remoting.twopc;

import javax.ws.rs.client.Client;

/**
* Abstract Builder for creation of Rest Client
*
* You can create a sublcass if you need some special handling like connection polling, timeouts, ...
* Implementation is defined by the property com.atomikos.remoting.rest_client_builder
* Default in transaction-default.properties is
* com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder
*
* Here some example:
* <pre>
*
* public class PooledRestClientBuilder extends RestClientBuilder {
@Override
public Client build() {
ResteasyClientBuilder builder = new ResteasyClientBuilder();
ConfigProperties configProperties = Configuration.getConfigProperties();
String connectionPoolSizeProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectionPoolSize");
int connectionPoolSize = 20;
if (connectionPoolSizeProperty != null)
connectionPoolSize = Integer.valueOf(connectionPoolSizeProperty);
String connectTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectTimeout");
int connectTimeout = 10;
if (connectTimeoutProperty != null)
connectTimeout = Integer.valueOf(connectTimeoutProperty);
String readTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.readTimeout");
int readTimeout = 60;
if (readTimeoutProperty != null)
readTimeout = Integer.valueOf(readTimeoutProperty);
builder.connectTimeout(connectTimeout, TimeUnit.SECONDS);
builder.readTimeout(readTimeout, TimeUnit.SECONDS);
Client c = builder.connectionPoolSize(connectionPoolSize).build();
c.property("jersey.config.client.suppressHttpComplianceValidation", true);
c.register(ParticipantsProvider.class);
return c;
}
}
*
* </pre>
*/
public abstract class RestClientBuilder {

public abstract Client build();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ com.atomikos.icatch.logcloud_datasource_name=logCloudDS
com.atomikos.icatch.throw_on_heuristic=false
com.atomikos.icatch.log_lock_acquisition_max_attempts=3
com.atomikos.icatch.log_lock_acquisition_retry_delay=1000
com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder

0 comments on commit f7990c2

Please sign in to comment.