Skip to content

Commit 1c82e49

Browse files
authored
Log the number of messages received from the SQS queue, including a count of the number of messages that will need to be processed. Also, include logging of deletes at the debug level. (opensearch-project#2011)
Signed-off-by: David Venable <[email protected]>
1 parent bdbb896 commit 1c82e49

File tree

1 file changed

+19
-9
lines changed
  • data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source

1 file changed

+19
-9
lines changed

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/SqsWorker.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public SqsWorker(final SqsClient sqsClient,
6969
@Override
7070
public void run() {
7171

72-
while(true) {
72+
while (true) {
7373
int messagesProcessed = 0;
7474
try {
7575
messagesProcessed = processSqsMessages();
@@ -152,26 +152,35 @@ private ParsedMessage convertS3EventMessages(final Message message) {
152152
}
153153

154154
private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(final Collection<ParsedMessage> s3EventNotificationRecords) {
155-
List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection = new ArrayList<>();
155+
final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection = new ArrayList<>();
156+
final List<ParsedMessage> parsedMessagesToRead = new ArrayList<>();
157+
156158
for (ParsedMessage parsedMessage : s3EventNotificationRecords) {
157159
if (parsedMessage.failedParsing) {
158160
sqsMessagesFailedCounter.increment();
159161
if (s3SourceConfig.getOnErrorOption().equals(OnErrorOption.DELETE_MESSAGES)) {
160162
deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.message));
161163
}
162-
}
163-
else {
164+
} else {
164165
final List<S3EventNotification.S3EventNotificationRecord> notificationRecords = parsedMessage.notificationRecords;
165-
if(!notificationRecords.isEmpty() && isEventNameCreated(notificationRecords.get(0))) {
166-
final S3ObjectReference s3ObjectReference = populateS3Reference(notificationRecords.get(0));
167-
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference);
168-
deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add);
166+
if (!notificationRecords.isEmpty() && isEventNameCreated(notificationRecords.get(0))) {
167+
parsedMessagesToRead.add(parsedMessage);
169168
} else {
170169
// Add SQS message to delete collection if the eventName is not ObjectCreated
171170
deleteMessageBatchRequestEntryCollection.add(buildDeleteMessageBatchRequestEntry(parsedMessage.message));
172171
}
173172
}
174173
}
174+
175+
LOG.info("Received {} messages from SQS. Read {} messages from S3.", s3EventNotificationRecords.size(), parsedMessagesToRead.size());
176+
177+
for (ParsedMessage parsedMessage : parsedMessagesToRead) {
178+
final List<S3EventNotification.S3EventNotificationRecord> notificationRecords = parsedMessage.notificationRecords;
179+
final S3ObjectReference s3ObjectReference = populateS3Reference(notificationRecords.get(0));
180+
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference);
181+
deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add);
182+
}
183+
175184
return deleteMessageBatchRequestEntryCollection;
176185
}
177186

@@ -194,6 +203,7 @@ private Optional<DeleteMessageBatchRequestEntry> processS3Object(
194203
}
195204

196205
private void deleteSqsMessages(final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection) {
206+
LOG.debug("Deleting {} messages from SQS.", deleteMessageBatchRequestEntryCollection.size());
197207
final DeleteMessageBatchRequest deleteMessageBatchRequest = buildDeleteMessageBatchRequest(deleteMessageBatchRequestEntryCollection);
198208
final DeleteMessageBatchResponse deleteMessageBatchResponse = sqsClient.deleteMessageBatch(deleteMessageBatchRequest);
199209
if (deleteMessageBatchResponse.hasSuccessful()) {
@@ -224,7 +234,7 @@ private boolean isEventNameCreated(final S3EventNotification.S3EventNotification
224234
private S3ObjectReference populateS3Reference(final S3EventNotification.S3EventNotificationRecord s3EventNotificationRecord) {
225235
final S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getS3();
226236
return S3ObjectReference.bucketAndKey(s3Entity.getBucket().getName(),
227-
s3Entity.getObject().getUrlDecodedKey())
237+
s3Entity.getObject().getUrlDecodedKey())
228238
.build();
229239
}
230240

0 commit comments

Comments
 (0)