diff --git a/src/main/java/com/uid2/optout/delta/StopReason.java b/src/main/java/com/uid2/optout/delta/StopReason.java new file mode 100644 index 00000000..0f61d7d5 --- /dev/null +++ b/src/main/java/com/uid2/optout/delta/StopReason.java @@ -0,0 +1,39 @@ +package com.uid2.optout.delta; + +/** + * Represents why delta production stopped. + * Used across all layers (batch, window, orchestrator) for consistent stop reason tracking. + */ +public enum StopReason { + + /** + * Processing completed normally with work done, or still in progress. + */ + NONE, + + /** + * No messages available in the SQS queue. + */ + QUEUE_EMPTY, + + /** + * Messages exist in the queue but are too recent (less than deltaWindowSeconds old). + */ + MESSAGES_TOO_RECENT, + + /** + * Hit the maximum messages per window limit. + */ + MESSAGE_LIMIT_EXCEEDED, + + /** + * Pre-existing manual override was set (checked at job start). + */ + MANUAL_OVERRIDE_ACTIVE, + + /** + * Circuit breaker triggered during processing (traffic spike detected). + */ + CIRCUIT_BREAKER_TRIGGERED +} + diff --git a/src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java similarity index 52% rename from src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java rename to src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java index 0a656e98..afe08619 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java +++ b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java @@ -1,15 +1,15 @@ -package com.uid2.optout.vertx; +package com.uid2.optout.sqs; +import com.uid2.optout.delta.StopReason; import com.uid2.shared.optout.OptOutUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Applies parsing, validation, filtering, and deletion of corrupted SQS messages. @@ -29,46 +29,46 @@ public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSe } /** - * Result of processing a batch of messages from SQS. - * Encapsulates eligible messages and metadata about the processing. + * Result of processing a batch of (10) messages from SQS. + * Encapsulates eligible messages and the reason for stopping (if any). */ public static class BatchProcessingResult { private final List eligibleMessages; - private final boolean shouldStopProcessing; + private final StopReason stopReason; - private BatchProcessingResult(List eligibleMessages, boolean shouldStopProcessing) { + private BatchProcessingResult(List eligibleMessages, StopReason stopReason) { this.eligibleMessages = eligibleMessages; - this.shouldStopProcessing = shouldStopProcessing; + this.stopReason = stopReason; } - public static BatchProcessingResult withEligibleMessages(List messages) { - return new BatchProcessingResult(messages, false); + public static BatchProcessingResult withMessages(List messages) { + return new BatchProcessingResult(messages, StopReason.NONE); } - public static BatchProcessingResult stopProcessing() { - return new BatchProcessingResult(new ArrayList<>(), true); + public static BatchProcessingResult messagesTooRecent() { + return new BatchProcessingResult(List.of(), StopReason.MESSAGES_TOO_RECENT); } - public static BatchProcessingResult empty() { - return new BatchProcessingResult(new ArrayList<>(), false); + public static BatchProcessingResult corruptMessagesDeleted() { + return new BatchProcessingResult(List.of(), StopReason.NONE); } - public boolean isEmpty() { - return eligibleMessages.isEmpty(); + public boolean hasMessages() { + return !eligibleMessages.isEmpty(); } - public boolean shouldStopProcessing() { - return shouldStopProcessing; + public StopReason getStopReason() { + return stopReason; } - public List getEligibleMessages() { + public List getMessages() { return eligibleMessages; } } /** * Processes a batch of messages: parses, validates, cleans up invalid messages, - * and filters for eligible messages based on age threshold (message is less than 5 minutes old) + * and filters for eligible messages based on age threshold (message is older than deltaWindowSeconds) * * @param messageBatch Raw messages from SQS * @param batchNumber The batch number (for logging) @@ -82,59 +82,53 @@ public BatchProcessingResult processBatch(List messageBatch, int batchN if (parsedBatch.size() < messageBatch.size()) { List invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch); if (!invalidMessages.isEmpty()) { - LOGGER.error("Found {} invalid messages in batch {} (failed parsing). Deleting from queue.", - invalidMessages.size(), batchNumber); + LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber); SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages); } } - // If no valid messages, return empty result + // No valid messages after deleting corrupt ones, continue reading if (parsedBatch.isEmpty()) { - LOGGER.warn("No valid messages in batch {} (all failed parsing)", batchNumber); - return BatchProcessingResult.empty(); + LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber); + return BatchProcessingResult.corruptMessagesDeleted(); } // Check if the oldest message in this batch is too recent long currentTime = OptOutUtils.nowEpochSeconds(); SqsParsedMessage oldestMessage = parsedBatch.get(0); - long messageAge = currentTime - oldestMessage.getTimestamp(); - if (messageAge < this.deltaWindowSeconds) { - // Signal to stop processing - messages are too recent - return BatchProcessingResult.stopProcessing(); + if (!isMessageEligible(oldestMessage, currentTime)) { + return BatchProcessingResult.messagesTooRecent(); } - // Filter for eligible messages (>= 5 minutes old) + // Filter for eligible messages (>= deltaWindowSeconds old) List eligibleMessages = filterEligibleMessages(parsedBatch, currentTime); - if (eligibleMessages.isEmpty()) { - LOGGER.debug("No eligible messages in batch {} (all too recent)", batchNumber); - return BatchProcessingResult.empty(); - } + return BatchProcessingResult.withMessages(eligibleMessages); + } - return BatchProcessingResult.withEligibleMessages(eligibleMessages); + /** + * Checks if a message is old enough to be processed. + * + * @param message The parsed message to check + * @param currentTime Current time in epoch seconds + * @return true if the message is at least deltaWindowSeconds old + */ + private boolean isMessageEligible(SqsParsedMessage message, long currentTime) { + return currentTime - message.timestamp() >= this.deltaWindowSeconds; } /** * Filters messages to only include those where sufficient time has elapsed. -. * + * * @param messages List of parsed messages * @param currentTime Current time in seconds * @return List of messages that meet the time threshold */ - public List filterEligibleMessages( - List messages, - long currentTime) { - - List eligibleMessages = new ArrayList<>(); - - for (SqsParsedMessage pm : messages) { - if (currentTime - pm.getTimestamp() >= this.deltaWindowSeconds) { - eligibleMessages.add(pm); - } - } - - return eligibleMessages; + List filterEligibleMessages(List messages, long currentTime) { + return messages.stream() + .filter(msg -> isMessageEligible(msg, currentTime)) + .collect(Collectors.toList()); } /** @@ -145,21 +139,12 @@ public List filterEligibleMessages( * @return List of messages that failed to parse */ private List identifyInvalidMessages(List originalBatch, List parsedBatch) { - // Create a set of message IDs from successfully parsed messages - Set validMessageIds = new HashSet<>(); - for (SqsParsedMessage parsed : parsedBatch) { - validMessageIds.add(parsed.getOriginalMessage().messageId()); - } + Set validIds = parsedBatch.stream() + .map(p -> p.originalMessage().messageId()) + .collect(Collectors.toSet()); - // Find messages that were not successfully parsed - List invalidMessages = new ArrayList<>(); - for (Message msg : originalBatch) { - if (!validMessageIds.contains(msg.messageId())) { - invalidMessages.add(msg); - } - } - - return invalidMessages; + return originalBatch.stream() + .filter(msg -> !validIds.contains(msg.messageId())) + .collect(Collectors.toList()); } } - diff --git a/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java b/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java new file mode 100644 index 00000000..afd651cc --- /dev/null +++ b/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java @@ -0,0 +1,213 @@ +package com.uid2.optout.sqs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility class for SQS message operations. + */ +public class SqsMessageOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageOperations.class); + private static final int SQS_MAX_DELETE_BATCH_SIZE = 10; + + /** + * Result of getting queue attributes from SQS. + */ + public static class QueueAttributes { + private final int approximateNumberOfMessages; + private final int approximateNumberOfMessagesNotVisible; + private final int approximateNumberOfMessagesDelayed; + + public QueueAttributes(int approximateNumberOfMessages, + int approximateNumberOfMessagesNotVisible, + int approximateNumberOfMessagesDelayed) { + this.approximateNumberOfMessages = approximateNumberOfMessages; + this.approximateNumberOfMessagesNotVisible = approximateNumberOfMessagesNotVisible; + this.approximateNumberOfMessagesDelayed = approximateNumberOfMessagesDelayed; + } + + /** Number of messages available for retrieval from the queue (visible messages) */ + public int getApproximateNumberOfMessages() { + return approximateNumberOfMessages; + } + + /** Number of messages that are in flight (being processed by consumers, invisible) */ + public int getApproximateNumberOfMessagesNotVisible() { + return approximateNumberOfMessagesNotVisible; + } + + /** Number of messages in the queue that are delayed and not available yet */ + public int getApproximateNumberOfMessagesDelayed() { + return approximateNumberOfMessagesDelayed; + } + + /** Total messages in queue = visible + invisible + delayed */ + public int getTotalMessages() { + return approximateNumberOfMessages + approximateNumberOfMessagesNotVisible + approximateNumberOfMessagesDelayed; + } + + @Override + public String toString() { + return String.format("QueueAttributes{visible=%d, invisible=%d, delayed=%d, total=%d}", + approximateNumberOfMessages, approximateNumberOfMessagesNotVisible, + approximateNumberOfMessagesDelayed, getTotalMessages()); + } + } + + /** + * Gets queue attributes from SQS including message counts. + * + * @param sqsClient The SQS client + * @param queueUrl The queue URL + * @return QueueAttributes with message counts, or null if failed + */ + public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String queueUrl) { + try { + GetQueueAttributesRequest request = GetQueueAttributesRequest.builder() + .queueUrl(queueUrl) + .attributeNames( + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED + ) + .build(); + + GetQueueAttributesResponse response = sqsClient.getQueueAttributes(request); + Map attrs = response.attributes(); + + int visible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES), 0); + int invisible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE), 0); + int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0); + + QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed); + LOGGER.info("queue attributes: {}", queueAttributes); + return queueAttributes; + + } catch (Exception e) { + LOGGER.info("sqs_error: error getting queue attributes", e); + return null; + } + } + + private static int parseIntOrDefault(String value, int defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + /** + * Receives a batch of messages from SQS. + * + * @param sqsClient The SQS client + * @param queueUrl The queue URL + * @param maxMessages Maximum number of messages to receive (max 10) + * @param visibilityTimeout Visibility timeout in seconds + * @return List of received messages + */ + public static List receiveMessagesFromSqs( + SqsClient sqsClient, + String queueUrl, + int maxMessages, + int visibilityTimeout) { + + try { + ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .maxNumberOfMessages(maxMessages) + .visibilityTimeout(visibilityTimeout) + .waitTimeSeconds(0) // non-blocking poll + .messageSystemAttributeNames(MessageSystemAttributeName.SENT_TIMESTAMP) // request sqs system timestamp + .build(); + + ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest); + + LOGGER.info("received {} messages", response.messages().size()); + return response.messages(); + + } catch (Exception e) { + LOGGER.error("sqs_error: failed to receive messages", e); + return new ArrayList<>(); + } + } + + /** + * Deletes messages from SQS in batches (max 10 per batch). + * Retries failed deletes as long as progress is being made. + * + * @param sqsClient The SQS client + * @param queueUrl The queue URL + * @param messages Messages to delete + */ + public static void deleteMessagesFromSqs(SqsClient sqsClient, String queueUrl, List messages) { + if (messages.isEmpty()) { + return; + } + + try { + int totalDeleted = 0; + List batch = new ArrayList<>(); + + for (int i = 0; i < messages.size(); i++) { + batch.add(DeleteMessageBatchRequestEntry.builder() + .id(String.valueOf(i)) + .receiptHandle(messages.get(i).receiptHandle()) + .build()); + + // send batch when we reach 10 messages or end of list + if (batch.size() == SQS_MAX_DELETE_BATCH_SIZE || i == messages.size() - 1) { + totalDeleted += deleteBatchWithRetry(sqsClient, queueUrl, batch); + batch.clear(); + } + } + + LOGGER.info("deleted {} messages", totalDeleted); + } catch (Exception e) { + LOGGER.error("sqs_error: error deleting messages", e); + } + } + + /** Deletes batch, retrying failed entries. Retries once unconditionally, then only while making progress. */ + private static int deleteBatchWithRetry(SqsClient sqsClient, String queueUrl, List entries) { + int deleted = 0; + List toDelete = entries; + boolean retriedOnce = false; + + while (!toDelete.isEmpty()) { + DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch( + DeleteMessageBatchRequest.builder().queueUrl(queueUrl).entries(toDelete).build()); + + int succeeded = response.successful().size(); + deleted += succeeded; + + if (response.failed().isEmpty()) { + break; // all done + } + + // retry once unconditionally, then only if making progress + if (retriedOnce && succeeded == 0) { + LOGGER.error("sqs_error: {} deletes failed with no progress", response.failed().size()); + break; + } + retriedOnce = true; + + // retry deletion on the failed messages only + var failedIds = response.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet()); + toDelete = toDelete.stream().filter(e -> failedIds.contains(e.id())).toList(); + } + + return deleted; + } +} + diff --git a/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java b/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java similarity index 72% rename from src/main/java/com/uid2/optout/vertx/SqsMessageParser.java rename to src/main/java/com/uid2/optout/sqs/SqsMessageParser.java index 44a6c5e9..ac948b90 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java +++ b/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java @@ -1,4 +1,4 @@ -package com.uid2.optout.vertx; +package com.uid2.optout.sqs; import com.uid2.shared.optout.OptOutUtils; import io.vertx.core.json.JsonObject; @@ -23,24 +23,28 @@ public class SqsMessageParser { * @return List of parsed messages sorted by timestamp (oldest first) */ public static List parseAndSortMessages(List messages) { - List parsedMessages = new ArrayList<>(); + List parsedMessages = new ArrayList<>(messages.size()); for (Message message : messages) { - try { - // Extract SQS system timestamp (in milliseconds), or use current time as fallback - long timestampSeconds = extractTimestamp(message); - // Parse message body + String traceId = null; + + try { + // parse message body JsonObject body = new JsonObject(message.body()); + traceId = body.getString("trace_id"); + String identityHash = body.getString("identity_hash"); String advertisingId = body.getString("advertising_id"); - String traceId = body.getString("trace_id"); String clientIp = body.getString("client_ip"); String email = body.getString("email"); String phone = body.getString("phone"); + // extract sqs system timestamp (in milliseconds), or use current time as fallback + long timestampSeconds = extractTimestamp(message, traceId); + if (identityHash == null || advertisingId == null) { - LOGGER.error("Invalid message format, skipping: {}", message.body()); + LOGGER.error("sqs_error: invalid message format, messageId={}, traceId={}", message.messageId(), traceId); continue; } @@ -48,18 +52,18 @@ public static List parseAndSortMessages(List messages byte[] idBytes = OptOutUtils.base64StringTobyteArray(advertisingId); if (hashBytes == null || idBytes == null) { - LOGGER.error("Invalid base64 encoding, skipping message"); + LOGGER.error("sqs_error: invalid base64 encoding, messageId={}, traceId={}", message.messageId(), traceId); continue; } parsedMessages.add(new SqsParsedMessage(message, hashBytes, idBytes, timestampSeconds, email, phone, clientIp, traceId)); } catch (Exception e) { - LOGGER.error("Error parsing SQS message", e); + LOGGER.error("sqs_error: error parsing message, messageId={}, traceId={}", message.messageId(), traceId, e); } } - // Sort by timestamp - parsedMessages.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp())); + // sort by timestamp + parsedMessages.sort((a, b) -> Long.compare(a.timestamp(), b.timestamp())); return parsedMessages; } @@ -70,10 +74,10 @@ public static List parseAndSortMessages(List messages * @param message The SQS message * @return Timestamp in seconds */ - private static long extractTimestamp(Message message) { + private static long extractTimestamp(Message message, String traceId) { String sentTimestampStr = message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP); if (sentTimestampStr == null) { - LOGGER.warn("Message missing SentTimestamp attribute, using current time"); + LOGGER.info("message missing SentTimestamp, using current time instead, messageId={}, traceId={}", message.messageId(), traceId); return OptOutUtils.nowEpochSeconds(); } return Long.parseLong(sentTimestampStr) / 1000; // ms to seconds diff --git a/src/main/java/com/uid2/optout/sqs/SqsParsedMessage.java b/src/main/java/com/uid2/optout/sqs/SqsParsedMessage.java new file mode 100644 index 00000000..e0611445 --- /dev/null +++ b/src/main/java/com/uid2/optout/sqs/SqsParsedMessage.java @@ -0,0 +1,17 @@ +package com.uid2.optout.sqs; + +import software.amazon.awssdk.services.sqs.model.Message; + +/** + * Represents a parsed SQS message containing opt-out data. + */ +public record SqsParsedMessage( + Message originalMessage, + byte[] hashBytes, + byte[] idBytes, + long timestamp, + String email, + String phone, + String clientIp, + String traceId +) {} diff --git a/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java new file mode 100644 index 00000000..3a9fe711 --- /dev/null +++ b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java @@ -0,0 +1,147 @@ +package com.uid2.optout.sqs; + +import com.uid2.optout.delta.StopReason; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reads messages from SQS for complete 5-minute time windows. + * Handles accumulation of all messages for a window before returning. + * Limits messages per window to prevent memory issues. + */ +public class SqsWindowReader { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class); + + private final SqsClient sqsClient; + private final String queueUrl; + private final int maxMessagesPerPoll; + private final int visibilityTimeout; + private final int deltaWindowSeconds; + private final SqsBatchProcessor batchProcessor; + private int maxMessagesPerWindow; + + public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, + int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) { + this.sqsClient = sqsClient; + this.queueUrl = queueUrl; + this.maxMessagesPerPoll = maxMessagesPerPoll; + this.visibilityTimeout = visibilityTimeout; + this.deltaWindowSeconds = deltaWindowSeconds; + this.maxMessagesPerWindow = maxMessagesPerWindow; + this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); + LOGGER.info("initialized: maxMessagesPerWindow={}, maxMessagesPerPoll={}, visibilityTimeout={}, deltaWindowSeconds={}", + maxMessagesPerWindow, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); + } + + /** + * Result of reading messages for a 5-minute window. + */ + public static class WindowReadResult { + private final List messages; + private final long windowStart; + private final StopReason stopReason; + private final int rawMessagesRead; // total messages pulled from SQS + + private WindowReadResult(List messages, long windowStart, StopReason stopReason, int rawMessagesRead) { + this.messages = messages; + this.windowStart = windowStart; + this.stopReason = stopReason; + this.rawMessagesRead = rawMessagesRead; + } + + public static WindowReadResult withMessages(List messages, long windowStart, int rawMessagesRead) { + return new WindowReadResult(messages, windowStart, StopReason.NONE, rawMessagesRead); + } + + public static WindowReadResult queueEmpty(List messages, long windowStart, int rawMessagesRead) { + return new WindowReadResult(messages, windowStart, StopReason.QUEUE_EMPTY, rawMessagesRead); + } + + public static WindowReadResult messagesTooRecent(List messages, long windowStart, int rawMessagesRead) { + return new WindowReadResult(messages, windowStart, StopReason.MESSAGES_TOO_RECENT, rawMessagesRead); + } + + public static WindowReadResult messageLimitExceeded(List messages, long windowStart, int rawMessagesRead) { + return new WindowReadResult(messages, windowStart, StopReason.MESSAGE_LIMIT_EXCEEDED, rawMessagesRead); + } + + public List getMessages() { return messages; } + public long getWindowStart() { return windowStart; } + public boolean isEmpty() { return messages.isEmpty(); } + public StopReason getStopReason() { return stopReason; } + /** Total raw messages pulled from SQS */ + public int getRawMessagesRead() { return rawMessagesRead; } + } + + /** + * Reads messages from SQS for one complete 5-minute window. + * Keeps reading batches and accumulating messages until: + * - We discover the next window + * - Queue is empty (no more messages) + * - Messages are too recent (all messages younger than deltaWindowSeconds) + * - Message count exceeds maxMessagesPerWindow + * + * @return WindowReadResult with messages for the window, or empty if done + */ + public WindowReadResult readWindow() { + List windowMessages = new ArrayList<>(); + long currentWindowStart = 0; + int batchNumber = 0; + int rawMessagesRead = 0; // track total messages pulled from SQS + + while (true) { + if (windowMessages.size() >= maxMessagesPerWindow) { + LOGGER.warn("high_message_volume: message limit exceeded while reading window, {} messages >= limit {}", windowMessages.size(), maxMessagesPerWindow); + return WindowReadResult.messageLimitExceeded(windowMessages, currentWindowStart, rawMessagesRead); + } + + // Read one batch from SQS (up to 10 messages) + List rawBatch = SqsMessageOperations.receiveMessagesFromSqs( + this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout); + + if (rawBatch.isEmpty()) { + return WindowReadResult.queueEmpty(windowMessages, currentWindowStart, rawMessagesRead); + } + + rawMessagesRead += rawBatch.size(); + + // parse, validate, filter + SqsBatchProcessor.BatchProcessingResult batchResult = batchProcessor.processBatch(rawBatch, batchNumber++); + + if (!batchResult.hasMessages()) { + if (batchResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT) { + return WindowReadResult.messagesTooRecent(windowMessages, currentWindowStart, rawMessagesRead); + } + // Corrupt messages were deleted, continue reading + continue; + } + + // Add eligible messages to current window + boolean newWindow = false; + for (SqsParsedMessage msg : batchResult.getMessages()) { + long msgWindowStart = msg.timestamp(); + + // Discover start of window + if (currentWindowStart == 0) { + currentWindowStart = msgWindowStart; + } + + // Discover next window + if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) { + newWindow = true; + } + + windowMessages.add(msg); + } + + if (newWindow) { + return WindowReadResult.withMessages(windowMessages, currentWindowStart, rawMessagesRead); + } + } + } +} diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index 910a2c61..f0986907 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -2,6 +2,10 @@ import com.uid2.optout.Const; import com.uid2.optout.auth.InternalAuthMiddleware; +import com.uid2.optout.sqs.SqsWindowReader; +import com.uid2.optout.sqs.SqsParsedMessage; +import com.uid2.optout.sqs.SqsMessageOperations; +import com.uid2.optout.delta.StopReason; import com.uid2.shared.Utils; import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.health.HealthComponent; @@ -392,7 +396,7 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { // If no messages, we're done (queue empty or messages too recent) if (windowResult.isEmpty()) { - stoppedDueToMessagesTooRecent = windowResult.stoppedDueToMessagesTooRecent(); + stoppedDueToMessagesTooRecent = windowResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT; LOGGER.info("Delta production complete - no more eligible messages"); break; } @@ -409,8 +413,8 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { // Write all messages List sqsMessages = new ArrayList<>(); for (SqsParsedMessage msg : messages) { - writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); - sqsMessages.add(msg.getOriginalMessage()); + writeOptOutEntry(deltaStream, msg.hashBytes(), msg.idBytes(), msg.timestamp()); + sqsMessages.add(msg.originalMessage()); } // Upload and delete diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java index e8bd04b8..d0d5cd6d 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java @@ -1,5 +1,7 @@ package com.uid2.optout.vertx; +import com.uid2.optout.sqs.SqsParsedMessage; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,8 +153,8 @@ List parseFilterRules(JsonObject config) throws MalformedTraf } public boolean isDenylisted(SqsParsedMessage message) { - long timestamp = message.getTimestamp(); - String clientIp = message.getClientIp(); + long timestamp = message.timestamp(); + String clientIp = message.clientIp(); if (clientIp == null || clientIp.isEmpty()) { LOGGER.error("Request does not contain client IP, timestamp={}", timestamp); diff --git a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java b/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java deleted file mode 100644 index 6c2715b1..00000000 --- a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java +++ /dev/null @@ -1,140 +0,0 @@ -package com.uid2.optout.vertx; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.*; - -import java.util.ArrayList; -import java.util.List; - -/** - * Utility class for SQS message operations. - */ -public class SqsMessageOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageOperations.class); - private static final int SQS_MAX_DELETE_BATCH_SIZE = 10; - - /** - * Receives all available messages from an SQS queue up to a maximum number of batches. - * - * @param sqsClient The SQS client - * @param queueUrl The queue URL - * @param maxMessagesPerPoll Maximum messages to receive per poll (max 10) - * @param visibilityTimeout Visibility timeout in seconds - * @param maxBatches Maximum number of receive batches - * @return List of all received messages - */ - public static List receiveAllAvailableMessages( - SqsClient sqsClient, - String queueUrl, - int maxMessagesPerPoll, - int visibilityTimeout, - int maxBatches) { - - List allMessages = new ArrayList<>(); - int batchCount = 0; - - // Keep receiving messages until we get an empty batch or hit the limit - while (batchCount < maxBatches) { - List batch = receiveMessagesFromSqs(sqsClient, queueUrl, maxMessagesPerPoll, visibilityTimeout); - if (batch.isEmpty()) { - break; - } - allMessages.addAll(batch); - batchCount++; - - // If we got fewer messages than the max (of 10), the queue is likely empty - if (batch.size() < maxMessagesPerPoll) { - break; - } - } - - return allMessages; - } - - /** - * Receives a batch of messages from SQS. - * - * @param sqsClient The SQS client - * @param queueUrl The queue URL - * @param maxMessages Maximum number of messages to receive (max 10) - * @param visibilityTimeout Visibility timeout in seconds - * @return List of received messages - */ - public static List receiveMessagesFromSqs( - SqsClient sqsClient, - String queueUrl, - int maxMessages, - int visibilityTimeout) { - - try { - ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(maxMessages) - .visibilityTimeout(visibilityTimeout) - .waitTimeSeconds(0) // Non-blocking poll - .messageSystemAttributeNames(MessageSystemAttributeName.SENT_TIMESTAMP) // Request SQS system timestamp - .build(); - - ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest); - - LOGGER.debug("Received {} messages from SQS", response.messages().size()); - return response.messages(); - - } catch (Exception e) { - LOGGER.error("Error receiving messages from SQS", e); - return new ArrayList<>(); - } - } - - /** - * Deletes messages from SQS in batches (max 10 per batch). - * - * @param sqsClient The SQS client - * @param queueUrl The queue URL - * @param messages Messages to delete - */ - public static void deleteMessagesFromSqs(SqsClient sqsClient, String queueUrl, List messages) { - if (messages.isEmpty()) { - return; - } - - try { - List entries = new ArrayList<>(); - int batchId = 0; - int totalDeleted = 0; - - for (Message msg : messages) { - entries.add(DeleteMessageBatchRequestEntry.builder() - .id(String.valueOf(batchId++)) - .receiptHandle(msg.receiptHandle()) - .build()); - - // Send batch when we reach 10 messages or at the end - if (entries.size() == SQS_MAX_DELETE_BATCH_SIZE || batchId == messages.size()) { - DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder() - .queueUrl(queueUrl) - .entries(entries) - .build(); - - DeleteMessageBatchResponse deleteResponse = sqsClient.deleteMessageBatch(deleteRequest); - - if (!deleteResponse.failed().isEmpty()) { - LOGGER.error("Failed to delete {} messages from SQS", deleteResponse.failed().size()); - } else { - totalDeleted += entries.size(); - } - - entries.clear(); - } - } - - LOGGER.info("Deleted {} messages from SQS", totalDeleted); - - } catch (Exception e) { - LOGGER.error("Error deleting messages from SQS", e); - } - } -} - diff --git a/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java b/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java deleted file mode 100644 index 1ad8ba77..00000000 --- a/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.uid2.optout.vertx; - -import software.amazon.awssdk.services.sqs.model.Message; - -/** - * Represents a parsed SQS message containing opt-out data. - */ -public class SqsParsedMessage { - private final Message originalMessage; - private final byte[] hashBytes; - private final byte[] idBytes; - private final long timestamp; - private final String email; - private final String phone; - private final String clientIp; - private final String traceId; - - public SqsParsedMessage(Message originalMessage, byte[] hashBytes, byte[] idBytes, long timestamp, String email, String phone, String clientIp, String traceId) { - this.originalMessage = originalMessage; - this.hashBytes = hashBytes; - this.idBytes = idBytes; - this.timestamp = timestamp; - this.email = email; - this.phone = phone; - this.clientIp = clientIp; - this.traceId = traceId; - } - - public Message getOriginalMessage() { - return originalMessage; - } - - public byte[] getHashBytes() { - return hashBytes; - } - - public byte[] getIdBytes() { - return idBytes; - } - - public long getTimestamp() { - return timestamp; - } - - public String getEmail() { - return email; - } - - public String getPhone() { - return phone; - } - - public String getClientIp() { - return clientIp; - } - - public String getTraceId() { - return traceId; - } -} - diff --git a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java deleted file mode 100644 index 75368c62..00000000 --- a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java +++ /dev/null @@ -1,129 +0,0 @@ -package com.uid2.optout.vertx; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.Message; - -import java.util.ArrayList; -import java.util.List; - -/** - * Reads messages from SQS for complete 5-minute time windows. - * Handles accumulation of all messages for a window before returning. - * Limits messages per window to prevent memory issues. - */ -public class SqsWindowReader { - private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class); - - private final SqsClient sqsClient; - private final String queueUrl; - private final int maxMessagesPerPoll; - private final int visibilityTimeout; - private final int deltaWindowSeconds; - private final int maxMessagesPerFile; - private final SqsBatchProcessor batchProcessor; - - public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, - int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerFile) { - this.sqsClient = sqsClient; - this.queueUrl = queueUrl; - this.maxMessagesPerPoll = maxMessagesPerPoll; - this.visibilityTimeout = visibilityTimeout; - this.deltaWindowSeconds = deltaWindowSeconds; - this.maxMessagesPerFile = maxMessagesPerFile; - this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); - LOGGER.info("SqsWindowReader initialized with: maxMessagesPerFile: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}", - maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); - } - - /** - * Result of reading messages for a 5-minute window. - */ - public static class WindowReadResult { - private final List messages; - private final long windowStart; - private final boolean stoppedDueToMessagesTooRecent; - - public WindowReadResult(List messages, long windowStart, - boolean stoppedDueToMessagesTooRecent) { - this.messages = messages; - this.windowStart = windowStart; - this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; - } - - public List getMessages() { return messages; } - public long getWindowStart() { return windowStart; } - public boolean isEmpty() { return messages.isEmpty(); } - public boolean stoppedDueToMessagesTooRecent() { return stoppedDueToMessagesTooRecent; } - } - - /** - * Reads messages from SQS for one complete 5-minute window. - * Keeps reading batches and accumulating messages until: - * - We discover the next window - * - Queue is empty (no more messages) - * - Messages are too recent (all messages younger than 5 minutes) - * - Message limit is reached (memory protection) - * - * @return WindowReadResult with messages for the window, or empty if done - */ - public WindowReadResult readWindow() { - List windowMessages = new ArrayList<>(); - long currentWindowStart = 0; - - while (true) { - // Check if we've hit the message limit - if (windowMessages.size() >= this.maxMessagesPerFile) { - LOGGER.warn("Window message limit reached ({} messages). Truncating window starting at {} for memory protection.", - this.maxMessagesPerFile, currentWindowStart); - return new WindowReadResult(windowMessages, currentWindowStart, false); - } - - // Read one batch from SQS (up to 10 messages) - List rawBatch = SqsMessageOperations.receiveMessagesFromSqs( - this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout); - - if (rawBatch.isEmpty()) { - // Queue empty - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, false); - } - - // Process batch: parse, validate, filter - SqsBatchProcessor.BatchProcessingResult batchResult = batchProcessor.processBatch(rawBatch, 0); - - if (batchResult.isEmpty()) { - if (batchResult.shouldStopProcessing()) { - // Messages too recent - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, true); - } - // corrupt messages deleted, read next messages - continue; - } - - // Add eligible messages to current window - boolean newWindow = false; - for (SqsParsedMessage msg : batchResult.getEligibleMessages()) { - long msgWindowStart = (msg.getTimestamp() / this.deltaWindowSeconds) * this.deltaWindowSeconds; - - // discover start of window - if (currentWindowStart == 0) { - currentWindowStart = msgWindowStart; - } - - // discover new window - if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) { - newWindow = true; - } - - windowMessages.add(msg); - } - - if (newWindow) { - // close current window and return - return new WindowReadResult(windowMessages, currentWindowStart, false); - } - } - } -} - diff --git a/src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java b/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java similarity index 96% rename from src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java rename to src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java index 6fc1c865..312eda06 100644 --- a/src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java +++ b/src/test/java/com/uid2/optout/sqs/SqsBatchProcessorTest.java @@ -1,8 +1,9 @@ -package com.uid2.optout.vertx; +package com.uid2.optout.sqs; import io.vertx.core.json.JsonObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; + import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; @@ -126,8 +127,8 @@ public void testFilterEligibleMessages_boundaryCases() { // Should only include the last two (>= threshold) assertEquals(2, result.size()); - assertEquals(currentTime - windowSeconds, result.get(0).getTimestamp()); - assertEquals(currentTime - windowSeconds - 1, result.get(1).getTimestamp()); + assertEquals(currentTime - windowSeconds, result.get(0).timestamp()); + assertEquals(currentTime - windowSeconds - 1, result.get(1).timestamp()); } @Test @@ -147,9 +148,9 @@ public void testFilterEligibleMessages_preservesOrder() { assertEquals(3, result.size()); // Verify order is preserved - assertEquals(100, result.get(0).getTimestamp()); - assertEquals(200, result.get(1).getTimestamp()); - assertEquals(300, result.get(2).getTimestamp()); + assertEquals(100, result.get(0).timestamp()); + assertEquals(200, result.get(1).timestamp()); + assertEquals(300, result.get(2).timestamp()); } @Test diff --git a/src/test/java/com/uid2/optout/sqs/SqsMessageOperationsTest.java b/src/test/java/com/uid2/optout/sqs/SqsMessageOperationsTest.java new file mode 100644 index 00000000..0988c2ee --- /dev/null +++ b/src/test/java/com/uid2/optout/sqs/SqsMessageOperationsTest.java @@ -0,0 +1,316 @@ +package com.uid2.optout.sqs; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class SqsMessageOperationsTest { + + private SqsClient mockSqsClient; + private static final String TEST_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/test-queue"; + + @BeforeEach + void setUp() { + mockSqsClient = mock(SqsClient.class); + } + + // ==================== getQueueAttributes tests ==================== + + @Test + void testGetQueueAttributes_success() { + GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() + .attributes(Map.of( + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "100", + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "50", + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "25" + )) + .build(); + when(mockSqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(response); + + SqsMessageOperations.QueueAttributes attrs = SqsMessageOperations.getQueueAttributes(mockSqsClient, TEST_QUEUE_URL); + + assertNotNull(attrs); + assertEquals(100, attrs.getApproximateNumberOfMessages()); + assertEquals(50, attrs.getApproximateNumberOfMessagesNotVisible()); + assertEquals(25, attrs.getApproximateNumberOfMessagesDelayed()); + assertEquals(175, attrs.getTotalMessages()); + } + + @Test + void testGetQueueAttributes_exception() { + when(mockSqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))) + .thenThrow(new RuntimeException("SQS error")); + + SqsMessageOperations.QueueAttributes attrs = SqsMessageOperations.getQueueAttributes(mockSqsClient, TEST_QUEUE_URL); + + assertNull(attrs); + } + + @Test + void testGetQueueAttributes_missingAttributes() { + GetQueueAttributesResponse response = GetQueueAttributesResponse.builder() + .attributes(Map.of()) // empty + .build(); + when(mockSqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(response); + + SqsMessageOperations.QueueAttributes attrs = SqsMessageOperations.getQueueAttributes(mockSqsClient, TEST_QUEUE_URL); + + assertNotNull(attrs); + assertEquals(0, attrs.getApproximateNumberOfMessages()); + assertEquals(0, attrs.getTotalMessages()); + } + + // ==================== receiveMessagesFromSqs tests ==================== + + @Test + void testReceiveMessages_success() { + List messages = List.of( + Message.builder().messageId("1").receiptHandle("r1").build(), + Message.builder().messageId("2").receiptHandle("r2").build() + ); + ReceiveMessageResponse response = ReceiveMessageResponse.builder().messages(messages).build(); + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(response); + + List result = SqsMessageOperations.receiveMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, 10, 30); + + assertEquals(2, result.size()); + } + + @Test + void testReceiveMessages_exception() { + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenThrow(new RuntimeException("SQS error")); + + List result = SqsMessageOperations.receiveMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, 10, 30); + + assertTrue(result.isEmpty()); + } + + // ==================== deleteMessagesFromSqs tests ==================== + + @Test + void testDeleteMessages_emptyList() { + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, List.of()); + + verify(mockSqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void testDeleteMessages_allSucceed() { + List messages = createMessages(3); + DeleteMessageBatchResponse response = DeleteMessageBatchResponse.builder() + .successful( + DeleteMessageBatchResultEntry.builder().id("0").build(), + DeleteMessageBatchResultEntry.builder().id("1").build(), + DeleteMessageBatchResultEntry.builder().id("2").build() + ) + .failed(List.of()) + .build(); + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))).thenReturn(response); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(1)).deleteMessageBatch(captor.capture()); + assertEquals(3, captor.getValue().entries().size()); + } + + @Test + void testDeleteMessages_someFailThenSucceedOnRetry() { + List messages = createMessages(3); + + // First call: 2 succeed, 1 fails + DeleteMessageBatchResponse firstResponse = DeleteMessageBatchResponse.builder() + .successful( + DeleteMessageBatchResultEntry.builder().id("0").build(), + DeleteMessageBatchResultEntry.builder().id("1").build() + ) + .failed(BatchResultErrorEntry.builder().id("2").code("Error").build()) + .build(); + + // Second call (retry): the failed one succeeds + DeleteMessageBatchResponse secondResponse = DeleteMessageBatchResponse.builder() + .successful(DeleteMessageBatchResultEntry.builder().id("2").build()) + .failed(List.of()) + .build(); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(firstResponse) + .thenReturn(secondResponse); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(2)).deleteMessageBatch(captor.capture()); + List requests = captor.getAllValues(); + assertEquals(3, requests.get(0).entries().size()); // first attempt: all 3 + assertEquals(1, requests.get(1).entries().size()); // retry: only the failed one + } + + @Test + void testDeleteMessages_allFailRetryOnceUnconditionally() { + List messages = createMessages(2); + + // All fail on first and second attempt + DeleteMessageBatchResponse failResponse = DeleteMessageBatchResponse.builder() + .successful(List.of()) + .failed( + BatchResultErrorEntry.builder().id("0").code("Error").build(), + BatchResultErrorEntry.builder().id("1").code("Error").build() + ) + .build(); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))).thenReturn(failResponse); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + // Should retry once even with no progress, then stop + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(2)).deleteMessageBatch(captor.capture()); + List requests = captor.getAllValues(); + assertEquals(2, requests.get(0).entries().size()); // first attempt: all 2 + assertEquals(2, requests.get(1).entries().size()); // retry: still all 2 (none succeeded) + } + + @Test + void testDeleteMessages_retryWhileMakingProgress() { + List messages = createMessages(3); + + // First call: 1 succeeds, 2 fail + DeleteMessageBatchResponse first = DeleteMessageBatchResponse.builder() + .successful(DeleteMessageBatchResultEntry.builder().id("0").build()) + .failed( + BatchResultErrorEntry.builder().id("1").code("Error").build(), + BatchResultErrorEntry.builder().id("2").code("Error").build() + ) + .build(); + + // Second call: 1 succeeds, 1 fails (still making progress) + DeleteMessageBatchResponse second = DeleteMessageBatchResponse.builder() + .successful(DeleteMessageBatchResultEntry.builder().id("1").build()) + .failed(BatchResultErrorEntry.builder().id("2").code("Error").build()) + .build(); + + // Third call: last one succeeds + DeleteMessageBatchResponse third = DeleteMessageBatchResponse.builder() + .successful(DeleteMessageBatchResultEntry.builder().id("2").build()) + .failed(List.of()) + .build(); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(first) + .thenReturn(second) + .thenReturn(third); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(3)).deleteMessageBatch(captor.capture()); + List requests = captor.getAllValues(); + assertEquals(3, requests.get(0).entries().size()); // first: all 3 + assertEquals(2, requests.get(1).entries().size()); // second: 2 failed + assertEquals(1, requests.get(2).entries().size()); // third: 1 failed + } + + @Test + void testDeleteMessages_stopWhenNoProgressAfterRetry() { + List messages = createMessages(2); + + // First call: 1 succeeds, 1 fails + DeleteMessageBatchResponse first = DeleteMessageBatchResponse.builder() + .successful(DeleteMessageBatchResultEntry.builder().id("0").build()) + .failed(BatchResultErrorEntry.builder().id("1").code("Error").build()) + .build(); + + // Second call (retry): still fails, no progress + DeleteMessageBatchResponse second = DeleteMessageBatchResponse.builder() + .successful(List.of()) + .failed(BatchResultErrorEntry.builder().id("1").code("Error").build()) + .build(); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(first) + .thenReturn(second); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + // Should stop after second call since no progress was made + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(2)).deleteMessageBatch(captor.capture()); + List requests = captor.getAllValues(); + assertEquals(2, requests.get(0).entries().size()); // first: all 2 + assertEquals(1, requests.get(1).entries().size()); // retry: only the failed one + } + + @Test + void testDeleteMessages_batchesMoreThan10Messages() { + List messages = createMessages(15); + + DeleteMessageBatchResponse successResponse = DeleteMessageBatchResponse.builder() + .successful(List.of( + DeleteMessageBatchResultEntry.builder().id("0").build() + )) + .failed(List.of()) + .build(); + + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))).thenReturn(successResponse); + + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages); + + // Should be called twice: once for 10 messages, once for 5 + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(mockSqsClient, times(2)).deleteMessageBatch(captor.capture()); + + List requests = captor.getAllValues(); + assertEquals(10, requests.get(0).entries().size()); + assertEquals(5, requests.get(1).entries().size()); + } + + @Test + void testDeleteMessages_exception() { + List messages = createMessages(3); + when(mockSqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenThrow(new RuntimeException("SQS error")); + + // Should not throw, just log error + assertDoesNotThrow(() -> + SqsMessageOperations.deleteMessagesFromSqs(mockSqsClient, TEST_QUEUE_URL, messages)); + } + + // ==================== QueueAttributes tests ==================== + + @Test + void testQueueAttributes_toString() { + SqsMessageOperations.QueueAttributes attrs = new SqsMessageOperations.QueueAttributes(100, 50, 25); + + String str = attrs.toString(); + + assertTrue(str.contains("visible=100")); + assertTrue(str.contains("invisible=50")); + assertTrue(str.contains("delayed=25")); + assertTrue(str.contains("total=175")); + } + + // ==================== Helper methods ==================== + + private List createMessages(int count) { + List messages = new ArrayList<>(); + for (int i = 0; i < count; i++) { + messages.add(Message.builder() + .messageId("msg-" + i) + .receiptHandle("receipt-" + i) + .build()); + } + return messages; + } +} diff --git a/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java b/src/test/java/com/uid2/optout/sqs/SqsMessageParserTest.java similarity index 90% rename from src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java rename to src/test/java/com/uid2/optout/sqs/SqsMessageParserTest.java index 810a7a41..0f93c8b4 100644 --- a/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java +++ b/src/test/java/com/uid2/optout/sqs/SqsMessageParserTest.java @@ -1,7 +1,8 @@ -package com.uid2.optout.vertx; +package com.uid2.optout.sqs; import io.vertx.core.json.JsonObject; import org.junit.jupiter.api.Test; + import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import java.util.*; @@ -57,9 +58,9 @@ public void testParseAndSortMessages_validMessages() { assertEquals(3, result.size()); // Verify sorting (oldest first) - assertEquals(TEST_TIMESTAMP_SEC, result.get(0).getTimestamp()); - assertEquals(TEST_TIMESTAMP_SEC + 1, result.get(1).getTimestamp()); - assertEquals(TEST_TIMESTAMP_SEC + 2, result.get(2).getTimestamp()); + assertEquals(TEST_TIMESTAMP_SEC, result.get(0).timestamp()); + assertEquals(TEST_TIMESTAMP_SEC + 1, result.get(1).timestamp()); + assertEquals(TEST_TIMESTAMP_SEC + 2, result.get(2).timestamp()); } @Test @@ -158,7 +159,7 @@ public void testParseAndSortMessages_missingTimestamp() { assertEquals(1, result.size()); // Timestamp should be close to current time (within 10 seconds) long currentTime = System.currentTimeMillis() / 1000; - assertTrue(Math.abs(result.get(0).getTimestamp() - currentTime) < 10); + assertTrue(Math.abs(result.get(0).timestamp() - currentTime) < 10); } @Test @@ -172,8 +173,8 @@ public void testParseAndSortMessages_mixValidAndInvalid() { List result = SqsMessageParser.parseAndSortMessages(messages); assertEquals(2, result.size()); // Only valid messages - assertEquals(TEST_TIMESTAMP_SEC, result.get(0).getTimestamp()); - assertEquals(TEST_TIMESTAMP_SEC + 1, result.get(1).getTimestamp()); + assertEquals(TEST_TIMESTAMP_SEC, result.get(0).timestamp()); + assertEquals(TEST_TIMESTAMP_SEC + 1, result.get(1).timestamp()); } @Test @@ -184,7 +185,7 @@ public void testParseAndSortMessages_timestampConversion() { List result = SqsMessageParser.parseAndSortMessages(Arrays.asList(message)); assertEquals(1, result.size()); - assertEquals(1699308912L, result.get(0).getTimestamp()); // Should be in seconds + assertEquals(1699308912L, result.get(0).timestamp()); // Should be in seconds } @Test @@ -194,7 +195,7 @@ public void testParseAndSortMessages_preservesOriginalMessage() { List result = SqsMessageParser.parseAndSortMessages(Arrays.asList(originalMessage)); assertEquals(1, result.size()); - assertSame(originalMessage, result.get(0).getOriginalMessage()); + assertSame(originalMessage, result.get(0).originalMessage()); } @Test @@ -213,7 +214,7 @@ public void testParseAndSortMessages_sortingOrder() { assertEquals(5, result.size()); // Verify ascending order for (int i = 1; i < result.size(); i++) { - assertTrue(result.get(i - 1).getTimestamp() <= result.get(i).getTimestamp(), + assertTrue(result.get(i - 1).timestamp() <= result.get(i).timestamp(), "Messages should be sorted in ascending order by timestamp"); } } @@ -225,10 +226,10 @@ public void testParseAndSortMessages_parsesHashAndIdBytes() { List result = SqsMessageParser.parseAndSortMessages(Arrays.asList(message)); assertEquals(1, result.size()); - assertNotNull(result.get(0).getHashBytes()); - assertNotNull(result.get(0).getIdBytes()); - assertEquals(32, result.get(0).getHashBytes().length); - assertEquals(32, result.get(0).getIdBytes().length); + assertNotNull(result.get(0).hashBytes()); + assertNotNull(result.get(0).idBytes()); + assertEquals(32, result.get(0).hashBytes().length); + assertEquals(32, result.get(0).idBytes().length); } @Test @@ -264,8 +265,9 @@ public void testParseAndSortMessages_multipleValidMessages() { assertEquals(100, result.size()); // Verify all are sorted for (int i = 1; i < result.size(); i++) { - assertTrue(result.get(i - 1).getTimestamp() <= result.get(i).getTimestamp()); + assertTrue(result.get(i - 1).timestamp() <= result.get(i).timestamp()); } } } + diff --git a/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java b/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java new file mode 100644 index 00000000..044be5ae --- /dev/null +++ b/src/test/java/com/uid2/optout/sqs/SqsWindowReaderTest.java @@ -0,0 +1,290 @@ +package com.uid2.optout.sqs; + +import com.uid2.optout.delta.StopReason; +import io.vertx.core.json.JsonObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.*; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +public class SqsWindowReaderTest { + + private SqsClient mockSqsClient; + private SqsWindowReader windowReader; + + private static final String TEST_QUEUE_URL = "https://sqs.test.amazonaws.com/123456789/test"; + private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; // 32 bytes + private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; // 32 bytes + private static final int MAX_MESSAGES_PER_POLL = 10; + private static final int VISIBILITY_TIMEOUT = 240; + private static final int DELTA_WINDOW_SECONDS = 300; // 5 minutes + private static final int MAX_MESSAGES_PER_WINDOW = 100; + + @BeforeEach + void setUp() { + mockSqsClient = mock(SqsClient.class); + windowReader = new SqsWindowReader( + mockSqsClient, TEST_QUEUE_URL, MAX_MESSAGES_PER_POLL, + VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, MAX_MESSAGES_PER_WINDOW + ); + } + + @Test + void testReadWindow_emptyQueue() { + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertTrue(result.isEmpty()); + assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason()); + assertEquals(0, result.getRawMessagesRead()); + } + + @Test + void testReadWindow_singleBatchSingleWindow() { + long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; // 10 minutes ago + List messages = Arrays.asList( + createMessage(windowStartSeconds + 10), + createMessage(windowStartSeconds + 50), + createMessage(windowStartSeconds + 100) + ); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertEquals(3, result.getMessages().size()); + assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason()); + assertEquals(3, result.getRawMessagesRead()); + } + + @Test + void testReadWindow_multipleBatchesSameWindow() { + long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; // 10 minutes ago + + List batch1 = Arrays.asList( + createMessage(windowStartSeconds + 10), + createMessage(windowStartSeconds + 20) + ); + List batch2 = Arrays.asList( + createMessage(windowStartSeconds + 100), + createMessage(windowStartSeconds + 150) + ); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(batch1).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(batch2).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertEquals(4, result.getMessages().size()); + assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason()); + assertEquals(4, result.getRawMessagesRead()); + } + + @Test + void testReadWindow_messagesTooRecent() { + long currentTimeMs = System.currentTimeMillis(); + List messages = Arrays.asList( + createMessageWithTimestampMs(currentTimeMs - 1000), // 1 second ago + createMessageWithTimestampMs(currentTimeMs - 2000) // 2 seconds ago + ); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertTrue(result.isEmpty()); + assertEquals(StopReason.MESSAGES_TOO_RECENT, result.getStopReason()); + assertEquals(2, result.getRawMessagesRead()); + } + + @Test + void testReadWindow_messageLimitExceeded() { + SqsWindowReader smallLimitReader = new SqsWindowReader( + mockSqsClient, TEST_QUEUE_URL, MAX_MESSAGES_PER_POLL, + VISIBILITY_TIMEOUT, DELTA_WINDOW_SECONDS, 5 // Only 5 messages max + ); + + long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; + List batch = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + batch.add(createMessage(windowStartSeconds + i * 10)); + } + List batch2 = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + batch2.add(createMessage(windowStartSeconds + i * 10)); + } + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(batch).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(batch2).build()); + + SqsWindowReader.WindowReadResult result = smallLimitReader.readWindow(); + + assertEquals(StopReason.MESSAGE_LIMIT_EXCEEDED, result.getStopReason()); + assertEquals(10, result.getMessages().size()); + } + + @Test + void testReadWindow_discoversNewWindow() { + long window1StartSeconds = System.currentTimeMillis() / 1000 - 900; // 15 minutes ago + long window2StartSeconds = window1StartSeconds + DELTA_WINDOW_SECONDS + 100; // Next window + + List messages = Arrays.asList( + createMessage(window1StartSeconds + 10), + createMessage(window1StartSeconds + 50), + createMessage(window2StartSeconds + 10) // Next window + ); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertEquals(3, result.getMessages().size()); + assertEquals(StopReason.NONE, result.getStopReason()); + assertEquals(window1StartSeconds + 10, result.getWindowStart()); + } + + @Test + void testReadWindow_multipleWindowsMultipleBatchesPerWindow() { + // Window 1: 2 batches, then discovers window 2 + // Window 2: 2 batches (must be > 5 min old for eligibility) + long window1StartSeconds = System.currentTimeMillis() / 1000 - 1200; // 20 minutes ago + long window2StartSeconds = window1StartSeconds + DELTA_WINDOW_SECONDS + 100; // ~12 minutes ago + + List window1Batch1 = Arrays.asList( + createMessage(window1StartSeconds + 10), + createMessage(window1StartSeconds + 20), + createMessage(window1StartSeconds + 30) + ); + + List window1Batch2 = Arrays.asList( + createMessage(window1StartSeconds + 100), + createMessage(window1StartSeconds + 150), + createMessage(window1StartSeconds + 200) + ); + + // Mixed batch triggers new window detection + List mixedBatch = Arrays.asList( + createMessage(window1StartSeconds + 250), + createMessage(window2StartSeconds + 10), + createMessage(window2StartSeconds + 20) + ); + + List window2Batch1 = Arrays.asList( + createMessage(window2StartSeconds + 50), + createMessage(window2StartSeconds + 80) + ); + + List window2Batch2 = Arrays.asList( + createMessage(window2StartSeconds + 120), + createMessage(window2StartSeconds + 150) + ); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(window1Batch1).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(window1Batch2).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(mixedBatch).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(window2Batch1).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(window2Batch2).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); + + // First readWindow() returns window 1 + mixed batch (new window detected) + SqsWindowReader.WindowReadResult result1 = windowReader.readWindow(); + + assertEquals(9, result1.getMessages().size()); + assertEquals(StopReason.NONE, result1.getStopReason()); + assertEquals(window1StartSeconds + 10, result1.getWindowStart()); + assertEquals(9, result1.getRawMessagesRead()); + + // Second readWindow() processes window 2 + SqsWindowReader.WindowReadResult result2 = windowReader.readWindow(); + + assertEquals(4, result2.getMessages().size()); + assertEquals(StopReason.QUEUE_EMPTY, result2.getStopReason()); + assertEquals(window2StartSeconds + 50, result2.getWindowStart()); + assertEquals(4, result2.getRawMessagesRead()); + } + + @Test + void testReadWindow_corruptMessagesSkipped() { + long windowStartSeconds = System.currentTimeMillis() / 1000 - 600; + + // Corrupt message (missing required fields) + Message corruptMessage = Message.builder() + .body("{}") + .attributes(Map.of(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf((windowStartSeconds + 10) * 1000))) + .messageId("corrupt-1") + .receiptHandle("receipt-1") + .build(); + + List validBatch = Arrays.asList(createMessage(windowStartSeconds + 100)); + + when(mockSqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of(corruptMessage)).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(validBatch).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(List.of()).build()); + + SqsWindowReader.WindowReadResult result = windowReader.readWindow(); + + assertEquals(1, result.getMessages().size()); + assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason()); + } + + @Test + void testWindowReadResult_factoryMethods() { + List messages = List.of(); + long windowStart = 1700000000L; + + SqsWindowReader.WindowReadResult empty = SqsWindowReader.WindowReadResult.queueEmpty(messages, windowStart, 5); + assertEquals(StopReason.QUEUE_EMPTY, empty.getStopReason()); + assertEquals(5, empty.getRawMessagesRead()); + + SqsWindowReader.WindowReadResult tooRecent = SqsWindowReader.WindowReadResult.messagesTooRecent(messages, windowStart, 5); + assertEquals(StopReason.MESSAGES_TOO_RECENT, tooRecent.getStopReason()); + assertEquals(5, tooRecent.getRawMessagesRead()); + + SqsWindowReader.WindowReadResult limitExceeded = SqsWindowReader.WindowReadResult.messageLimitExceeded(messages, windowStart, 100); + assertEquals(StopReason.MESSAGE_LIMIT_EXCEEDED, limitExceeded.getStopReason()); + assertEquals(100, limitExceeded.getRawMessagesRead()); + + SqsWindowReader.WindowReadResult withMessages = SqsWindowReader.WindowReadResult.withMessages(messages, windowStart, 10); + assertEquals(StopReason.NONE, withMessages.getStopReason()); + assertEquals(10, withMessages.getRawMessagesRead()); + } + + // ==================== Helper methods ==================== + + private Message createMessage(long timestampSeconds) { + return createMessageWithTimestampMs(timestampSeconds * 1000); + } + + private Message createMessageWithTimestampMs(long timestampMs) { + JsonObject body = new JsonObject() + .put("identity_hash", VALID_HASH_BASE64) + .put("advertising_id", VALID_ID_BASE64); + + Map attributes = new HashMap<>(); + attributes.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampMs)); + + return Message.builder() + .body(body.encode()) + .attributes(attributes) + .messageId("msg-" + UUID.randomUUID()) + .receiptHandle("receipt-" + UUID.randomUUID()) + .build(); + } +} diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index a46023f6..85ee532d 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -16,6 +16,7 @@ import java.io.InputStream; import java.util.*; +import java.util.concurrent.CountDownLatch; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -77,7 +78,7 @@ public void tearDown(TestContext context) { vertx.close(context.asyncAssertSuccess()); } } - + private Message createMessage(String hash, String id, long timestampMs) { JsonObject body = new JsonObject() .put("identity_hash", hash) @@ -325,6 +326,9 @@ public void testDeltaProduceEndpoint_unauthorized(TestContext context) { public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context) throws Exception { Async async = context.async(); + // Latch to keep job running until we verify the conflict response + CountDownLatch uploadLatch = new CountDownLatch(1); + // Create messages that will take some time to process long oldTime = System.currentTimeMillis() - 400_000; List messages = Arrays.asList( @@ -339,7 +343,11 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) .thenReturn(DeleteMessageBatchResponse.builder().build()); - doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + // Block upload until latch is released + doAnswer(inv -> { + uploadLatch.await(); + return null; + }).when(cloudStorage).upload(any(InputStream.class), anyString()); int port = Const.Port.ServicePortForOptOut + 1; @@ -375,6 +383,8 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context context.assertEquals("conflict", response.getString("status")); context.assertTrue(response.getString("message").contains("already running")); + // Release the latch to let the first job complete + uploadLatch.countDown(); async.complete(); })); } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java index 63f6807c..88881c56 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java @@ -1,5 +1,7 @@ package com.uid2.optout.vertx; +import com.uid2.optout.sqs.SqsParsedMessage; + import org.junit.After; import org.junit.Before; import org.junit.Test;