Skip to content

Commit 7d75fc0

Browse files
authored
Add metric for sqs receive message failures for s3-sqs source (#6102)
Signed-off-by: Taylor Gray <[email protected]>
1 parent fafc6ec commit 7d75fc0

File tree

2 files changed

+9
-0
lines changed
  • data-prepper-plugins/s3-source/src

2 files changed

+9
-0
lines changed

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
public class SqsWorker implements Runnable {
5050
private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class);
5151
static final String SQS_MESSAGES_RECEIVED_METRIC_NAME = "sqsMessagesReceived";
52+
static final String SQS_RECEIVE_MESSAGES_FAILED_METRIC_NAME = "sqsReceiveMessageFailed";
5253
static final String SQS_MESSAGES_DELETED_METRIC_NAME = "sqsMessagesDeleted";
5354
static final String SQS_MESSAGES_FAILED_METRIC_NAME = "sqsMessagesFailed";
5455
static final String SQS_MESSAGES_DELETE_FAILED_METRIC_NAME = "sqsMessagesDeleteFailed";
@@ -65,6 +66,7 @@ public class SqsWorker implements Runnable {
6566
private final S3EventFilter objectCreatedFilter;
6667
private final S3EventFilter evenBridgeObjectCreatedFilter;
6768
private final Counter sqsMessagesReceivedCounter;
69+
private final Counter sqsReceiveMessagesFailedCounter;
6870
private final Counter sqsMessagesDeletedCounter;
6971
private final Counter sqsMessagesFailedCounter;
7072
private final Counter s3ObjectsEmptyCounter;
@@ -105,6 +107,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
105107
s3ObjectsEmptyCounter = pluginMetrics.counter(S3_OBJECTS_EMPTY_METRIC_NAME);
106108
sqsMessagesDeleteFailedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETE_FAILED_METRIC_NAME);
107109
sqsMessageDelayTimer = pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME);
110+
sqsReceiveMessagesFailedCounter = pluginMetrics.counter(SQS_RECEIVE_MESSAGES_FAILED_METRIC_NAME);
108111
acknowledgementSetCallbackCounter = pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME);
109112
sqsVisibilityTimeoutChangedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME);
110113
sqsVisibilityTimeoutChangeFailedCount = pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME);
@@ -161,6 +164,7 @@ private List<Message> getMessagesFromSqs() {
161164
return messages;
162165
} catch (final SqsException | StsException e) {
163166
LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage());
167+
sqsReceiveMessagesFailedCounter.increment();
164168
applyBackoff();
165169
return Collections.emptyList();
166170
}

data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGES_FAILED_METRIC_NAME;
8787
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME;
8888
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_MESSAGE_DELAY_METRIC_NAME;
89+
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_RECEIVE_MESSAGES_FAILED_METRIC_NAME;
8990
import static org.opensearch.dataprepper.plugins.source.s3.SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME;
9091

9192
@ExtendWith(MockitoExtension.class)
@@ -97,6 +98,7 @@ class SqsWorkerTest {
9798
private PluginMetrics pluginMetrics;
9899
private Backoff backoff;
99100
private Counter sqsMessagesReceivedCounter;
101+
private Counter sqsReceiveMessageFailedCounter;
100102
private Counter sqsMessagesDeletedCounter;
101103
private Counter sqsMessagesFailedCounter;
102104
private Counter sqsMessagesDeleteFailedCounter;
@@ -132,6 +134,7 @@ void setUp() {
132134
sqsMessagesDeletedCounter = mock(Counter.class);
133135
sqsMessagesFailedCounter = mock(Counter.class);
134136
sqsMessagesDeleteFailedCounter = mock(Counter.class);
137+
sqsReceiveMessageFailedCounter = mock(Counter.class);
135138
s3ObjectsEmptyCounter = mock(Counter.class);
136139
sqsMessageDelayTimer = mock(Timer.class);
137140
when(pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME)).thenReturn(sqsMessagesReceivedCounter);
@@ -141,6 +144,7 @@ void setUp() {
141144
when(pluginMetrics.counter(S3_OBJECTS_EMPTY_METRIC_NAME)).thenReturn(s3ObjectsEmptyCounter);
142145
when(pluginMetrics.timer(SQS_MESSAGE_DELAY_METRIC_NAME)).thenReturn(sqsMessageDelayTimer);
143146
when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME)).thenReturn(mock(Counter.class));
147+
when(pluginMetrics.counter(SQS_RECEIVE_MESSAGES_FAILED_METRIC_NAME)).thenReturn(sqsReceiveMessageFailedCounter);
144148
when(pluginMetrics.counter(SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME)).thenReturn(sqsVisibilityTimeoutChangedCount);
145149
}
146150

@@ -531,6 +535,7 @@ void processSqsMessages_should_return_zero_messages_when_a_SqsException_is_throw
531535
final int messagesProcessed = createObjectUnderTest().processSqsMessages();
532536
assertThat(messagesProcessed, equalTo(0));
533537
verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
538+
verify(sqsReceiveMessageFailedCounter).increment();
534539
}
535540

536541
@Test

0 commit comments

Comments
 (0)