-
Notifications
You must be signed in to change notification settings - Fork 48
Custom visibility timeout for each message in SQS queue #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 14 commits
e5278a0
f34294a
3b3fa89
120d14c
e0d6d7e
547bdce
93eebe7
fbcbe24
0b8ebf6
0c6610f
dbfab5a
38fe205
43f159c
a45e68e
6cd28f5
4b743bb
2f08cfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,26 +29,37 @@ | |
| */ | ||
| public abstract class AbstractSQSConnector implements SQSConnector { | ||
| protected static final String AWS_ERROR_CODE_AUTHENTICATION = "InvalidClientTokenId"; | ||
|
|
||
| protected final Log _log = LogFactory.getLog(getClass()); | ||
|
|
||
| private final long _receiveCheckIntervalMs; | ||
| private final boolean _isAsync; | ||
| private final int _visibilityTimeoutOnReset; | ||
|
|
||
| protected AbstractSQSConnector(long receiveCheckIntervalMs) | ||
| { | ||
| this(receiveCheckIntervalMs, false); | ||
| } | ||
|
|
||
| protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) | ||
| protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) | ||
| { | ||
| _receiveCheckIntervalMs = receiveCheckIntervalMs; | ||
| _isAsync = isAsync; | ||
| _visibilityTimeoutOnReset = visibilityTimeoutOnReset; | ||
| } | ||
|
|
||
| protected AbstractSQSConnector(long receiveCheckIntervalMs, boolean isAsync) | ||
| { | ||
| this(receiveCheckIntervalMs, isAsync, 0); | ||
| } | ||
|
|
||
| public boolean isAsync() { | ||
| return _isAsync; | ||
| } | ||
|
|
||
| public int getVisibilityTimeoutOnReset() { | ||
| return _visibilityTimeoutOnReset; | ||
| } | ||
|
|
||
| public void sendMessage(NevadoDestination destination, NevadoMessage message) throws JMSException | ||
| { | ||
|
|
@@ -127,7 +138,7 @@ public void resetMessage(NevadoMessage message) throws JMSException { | |
| "Did this come from an SQS queue?"); | ||
| } | ||
| SQSQueue sqsQueue = getSQSQueue(message.getNevadoDestination()); | ||
| sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, 0); | ||
| sqsQueue.setMessageVisibilityTimeout(sqsReceiptHandle, _visibilityTimeoutOnReset); // Customize message visibility timeout | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -179,7 +190,9 @@ protected SQSMessage receiveSQSMessage(NevadoConnection connection, NevadoDestin | |
| if (sqsMessage != null && !connection.isRunning()) { | ||
| // Connection was stopped while the REST call to SQS was being made | ||
| try { | ||
| sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), 0); // Make it immediately available to the next requestor | ||
| if(sqsMessage.getReceiptHandle() != null && StringUtils.isNotEmpty(sqsMessage.getReceiptHandle()) && sqsMessage.getReceiptHandle().trim().length() > 0) { | ||
|
||
| sqsQueue.setMessageVisibilityTimeout(sqsMessage.getReceiptHandle(), _visibilityTimeoutOnReset); // Customize message visibility timeout | ||
| } | ||
| } catch (JMSException e) { | ||
| String exMessage = "Unable to reset visibility timeout for message: " + e.getMessage(); | ||
| _log.warn(exMessage, e); // Non-fatal. Just means the message will disappear until the visibility timeout expires. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,16 +47,25 @@ | |
| public class AmazonAwsSQSConnector extends AbstractSQSConnector { | ||
| public static final String MESSAGE_ATTRIBUTE_APPROXIMATE_RECEIVE_COUNT = "ApproximateReceiveCount"; | ||
|
|
||
| private final AmazonSQS _amazonSQS; | ||
| private final AmazonSNS _amazonSNS; | ||
| private AmazonSQS _amazonSQS; | ||
| private AmazonSNS _amazonSNS; | ||
|
|
||
| public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs) { | ||
| this(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, false); | ||
| } | ||
|
|
||
| public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync) { | ||
| super(receiveCheckIntervalMs, isAsync); | ||
| AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); | ||
| initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); | ||
| } | ||
|
|
||
| public AmazonAwsSQSConnector(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync, int visibilityTimeoutOnReset) { | ||
| super(receiveCheckIntervalMs, isAsync, visibilityTimeoutOnReset); | ||
| initializeConnection(awsAccessKey, awsSecretKey, isSecure, receiveCheckIntervalMs, isAsync); | ||
| } | ||
|
|
||
| private void initializeConnection(String awsAccessKey, String awsSecretKey, boolean isSecure, long receiveCheckIntervalMs, boolean isAsync){ | ||
|
||
| AWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey); | ||
| ClientConfiguration clientConfiguration = new ClientConfiguration(); | ||
| String proxyHost = System.getProperty("http.proxyHost"); | ||
| String proxyPort = System.getProperty("http.proxyPort"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra space. (But not the blank line. There are three spaces here.)