Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
08d4ab6
Added support for instance role profile credentials
dbonicelli-oa Sep 5, 2014
78cfb2b
Created version 1.3.2-OA
dbonicelli-oa Feb 17, 2015
ade3824
Fixed Nevado dependency
dbonicelli-oa Feb 17, 2015
494246f
Merge remote-tracking branch 'upstream/master'
dbonicelli-oa Feb 17, 2015
7e38f45
Merge remote-tracking branch 'upstream/master'
dbonicelli-oa Feb 17, 2015
3ca063d
Modified SQS connector to use instance profile credentials if access …
dbonicelli-oa Feb 17, 2015
ed9662d
Fixed parent version
dbonicelli-oa Feb 17, 2015
35823a2
Update pom.xml
objectivearts Feb 18, 2015
1890f20
Update pom.xml
objectivearts Feb 18, 2015
73f4e3e
Modify SQSConnector to pass CredentialsProvider instead of Credential…
mdeceunynck Jul 2, 2019
334438b
Make changes to support upgrade to AWS-SDK 12.
mdeceunynck Oct 10, 2022
34c422e
Make Version 1.3.2-OA3
mdeceunynck Mar 10, 2023
272f0ed
Make Version 1.3.2-OA4-SNAPSHOT. Upgrade AWS SDK for SQS and SNS. A…
mdeceunynck May 17, 2025
64bac31
Make Version 1.3.2-OA4.
mdeceunynck May 19, 2025
0b3ecbb
Change naming of Durable Queues to be much shorter.
mdeceunynck Jul 28, 2025
b0193c5
Merge Develop into Master.
mdeceunynck Jul 28, 2025
619464e
Make Version 1.3.2-OA5
mdeceunynck Jul 28, 2025
c176e0e
Make Version 1.3.2-OA5
mdeceunynck Jul 28, 2025
c1acd9b
Make Version 1.3.2-OA6-SNAPSHOT
mdeceunynck Jul 28, 2025
65d4ed0
Remove Log4j pom entry. Replace with corrected version.
mdeceunynck Sep 4, 2025
3202bcd
Merge pull request #1 from Objective-Arts/develop
mdeceunynck Sep 4, 2025
52a0d32
Make Version 1.3.2-OA6
mdeceunynck Sep 4, 2025
1d95b9f
Merge Master into Develop. Make Version 1.3.2-OA7-SNAPSHOT.
mdeceunynck Sep 4, 2025
546cf28
Upgrade Spring Version from 3.0.5.RELEASE to 4.3.21.RELEASE.
mdeceunynck Oct 8, 2025
c21d764
Upgrade org.json from 2009.... to 2023.... Also, remove an old sprin…
mdeceunynck Oct 8, 2025
73da887
Upgrade Jackson from 2.12.7 to 2.20. Add an exclusion to the aws-jav…
mdeceunynck Oct 8, 2025
4c826c8
Make Version 1.3.2-OA7.
mdeceunynck Oct 10, 2025
80013a9
Merge pull request #2 from Objective-Arts/Release_1.3.2-OA7
mdeceunynck Oct 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 56 additions & 19 deletions nevado-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<groupId>org.skyscreamer</groupId>
<artifactId>nevado</artifactId>
<version>1.3.3-SNAPSHOT</version>
<version>1.3.2</version>
</parent>

<groupId>org.skyscreamer</groupId>
<artifactId>nevado-jms</artifactId>
<version>1.3.3-SNAPSHOT</version>
<version>1.3.2-OA7</version>
<packaging>jar</packaging>

<name>Nevado JMS</name>
Expand Down Expand Up @@ -51,29 +51,59 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.12</version>
<version>20231013</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.codehaus.jackson</groupId>-->
<!-- <artifactId>jackson-core-asl</artifactId>-->
<!-- <version>1.9.12</version>-->
<!-- </dependency>-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.2</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.23.1</version> <!-- or latest stable -->
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.23.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>1.12.377</version>
<exclusions>
<!-- Aws SDK pulls in Jackson version 2.12.7, which conflicts a bit with the 2.20. -->
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.6.12</version>
<artifactId>aws-java-sdk-sns</artifactId>
<version>1.12.377</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.20</version>
</dependency>

<dependency>
Expand All @@ -90,16 +120,23 @@
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>2.0.0.RELEASE</version>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.integration</groupId>-->
<!-- <artifactId>spring-integration-jms</artifactId>-->
<!-- <version>2.0.0.RELEASE</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>activemq</groupId>
<artifactId>activemq</artifactId>
Expand All @@ -116,13 +153,13 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>

<properties>
<spring.version>3.0.5.RELEASE</spring.version>
<spring.version>4.3.21.RELEASE</spring.version>
</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,10 @@ protected String getDurableEndpointQueueName(String durableSubscriptionName) {

if (_connection.getClientID() != null)
{
queueName += "_client-" + _connection.getClientID() + "";
queueName += "_" + _connection.getClientID() + "";
}


return queueName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.auth.*;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.client.builder.ExecutorFactory;
import com.amazonaws.services.sns.*;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.*;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import org.apache.commons.lang.StringUtils;
import org.skyscreamer.nevado.jms.connector.AbstractSQSConnector;
import org.skyscreamer.nevado.jms.connector.SQSMessage;
import org.skyscreamer.nevado.jms.connector.SQSQueue;
Expand Down Expand Up @@ -53,13 +50,12 @@ public class AmazonAwsSQSConnector extends AbstractSQSConnector {
private boolean _testAlwaysPasses = false;


public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) {
this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false);
public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, long receiveCheckIntervalMs) {
this(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs, false);
}

public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) {
public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) {
super(receiveCheckIntervalMs, isAsync);
AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
ClientConfiguration clientConfiguration = new ClientConfiguration();
String proxyHost = System.getProperty("http.proxyHost");
String proxyPort = System.getProperty("http.proxyPort");
Expand All @@ -70,13 +66,77 @@ public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean i
}
}
clientConfiguration.setProtocol(isSecure ? Protocol.HTTPS : Protocol.HTTP);
AwsClientBuilder.EndpointConfiguration awsSQSEndpointConfiguration = new AwsClientBuilder.EndpointConfiguration(awsSQSEndpoint, null);
AwsClientBuilder.EndpointConfiguration awsSNSEndpointConfiguration = new AwsClientBuilder.EndpointConfiguration(awsSNSEndpoint, null);

if (isAsync) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
_amazonSQS = new AmazonSQSAsyncClient(awsCredentials, clientConfiguration, executorService);
_amazonSNS = new AmazonSNSAsyncClient(awsCredentials, clientConfiguration, executorService);
} else {
_amazonSQS = new AmazonSQSClient(awsCredentials, clientConfiguration);
_amazonSNS = new AmazonSNSClient(awsCredentials, clientConfiguration);
ExecutorFactory executorFactory = new ExecutorFactory() {
@Override
public ExecutorService newExecutor() {
return Executors.newSingleThreadExecutor();
}
};

if(StringUtils.isNotEmpty(awsAccessKey) && StringUtils.isNotEmpty(awsSecretKey)) {
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
_amazonSQS = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(clientConfiguration)
.withExecutorFactory(executorFactory)
.withEndpointConfiguration(awsSQSEndpointConfiguration)
.build();
_amazonSNS = AmazonSNSAsyncClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(clientConfiguration)
.withExecutorFactory(executorFactory)
.withEndpointConfiguration(awsSNSEndpointConfiguration)
.build();
}
else {
_amazonSQS = AmazonSQSAsyncClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.withExecutorFactory(executorFactory)
.withEndpointConfiguration(awsSQSEndpointConfiguration)
.build();
_amazonSNS = AmazonSNSAsyncClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.withExecutorFactory(executorFactory)
.withEndpointConfiguration(awsSNSEndpointConfiguration)
.build();
}
}
else {
if(StringUtils.isNotEmpty(awsAccessKey) && StringUtils.isNotEmpty(awsSecretKey)) {
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
_amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(clientConfiguration)
.withEndpointConfiguration(awsSQSEndpointConfiguration)
.build();
_amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(clientConfiguration)
.withEndpointConfiguration(awsSNSEndpointConfiguration)
.build();
}
else {
_amazonSQS = AmazonSQSClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.withEndpointConfiguration(awsSQSEndpointConfiguration)
.build();
_amazonSNS = AmazonSNSClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(InstanceProfileCredentialsProvider.getInstance())
.withEndpointConfiguration(awsSNSEndpointConfiguration)
.build();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,13 @@ public class AmazonAwsSQSConnectorFactory extends AbstractSQSConnectorFactory {

@Override
public AmazonAwsSQSConnector getInstance(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) {
AmazonAwsSQSConnector amazonAwsSQSConnector = createConnector(awsAccessKey, awsSecretKey);
AmazonAwsSQSConnector amazonAwsSQSConnector = createConnector(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint);
amazonAwsSQSConnector.setTestAlwaysPasses(_testAlwaysPasses);
if (StringUtils.isNotEmpty(awsSQSEndpoint)) {
amazonAwsSQSConnector.getAmazonSQS().setEndpoint(awsSQSEndpoint);
}
if (StringUtils.isNotEmpty(awsSNSEndpoint)) {
amazonAwsSQSConnector.getAmazonSNS().setEndpoint(awsSNSEndpoint);
}
return amazonAwsSQSConnector;
}

protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey) {
return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, _isSecure, _receiveCheckIntervalMs, _useAsyncSend);
protected AmazonAwsSQSConnector createConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint) {
return new AmazonAwsSQSConnector(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, _isSecure, _receiveCheckIntervalMs, _useAsyncSend);
}

public void setUseAsyncSend(boolean useAsyncSend) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
package org.skyscreamer.nevado.jms.connector.amazonaws;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.skyscreamer.nevado.jms.message.NevadoMessage;
import org.skyscreamer.nevado.jms.message.NevadoTextMessage;
/**
* Overrides the serialisation handling from AbstractSQSConnector so that raw
* text messages can be received/sent without wrapping.
*
* @author qi.chen
*/
public class PlainTextAmazonSQSConnector extends AmazonAwsSQSConnector {
public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure,
long receiveCheckIntervalMs) {
super(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs);
}
public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure,
long receiveCheckIntervalMs, boolean isAsync) {
super(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync);
}
@Override
protected String serializeMessage(NevadoMessage message) throws JMSException {
if (message instanceof TextMessage) {
return ((TextMessage) message).getText();
} else {
throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support " + message);
}
}
@Override
protected NevadoMessage deserializeMessage(String serializedMessage) throws JMSException {
NevadoTextMessage out = new NevadoTextMessage();
out.setText(serializedMessage);
return out;
}
}
package org.skyscreamer.nevado.jms.connector.amazonaws;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.skyscreamer.nevado.jms.message.NevadoMessage;
import org.skyscreamer.nevado.jms.message.NevadoTextMessage;

/**
* Overrides the serialisation handling from AbstractSQSConnector so that raw
* text messages can be received/sent without wrapping.
*
* @author qi.chen
*/
public class PlainTextAmazonSQSConnector extends AmazonAwsSQSConnector {

public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure,
long receiveCheckIntervalMs) {
super(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs);
}

public PlainTextAmazonSQSConnector(String awsAccessKey, String awsSecretKey, String awsSQSEndpoint, String awsSNSEndpoint, boolean isSecure,
long receiveCheckIntervalMs, boolean isAsync) {
super(awsAccessKey, awsSecretKey, awsSQSEndpoint, awsSNSEndpoint, isSecure, receiveCheckIntervalMs, isAsync);
}

@Override
protected String serializeMessage(NevadoMessage message) throws JMSException {
if (message instanceof TextMessage) {
return ((TextMessage) message).getText();
} else {
throw new UnsupportedOperationException(getClass().getSimpleName() + " does not support " + message);
}
}

@Override
protected NevadoMessage deserializeMessage(String serializedMessage) throws JMSException {
NevadoTextMessage out = new NevadoTextMessage();
out.setText(serializedMessage);
return out;
}

}
Loading