Skip to content
39 changes: 39 additions & 0 deletions src/main/java/com/uid2/optout/delta/StopReason.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.uid2.optout.delta;

/**
* Represents why delta production stopped.
* Used across all layers (batch, window, orchestrator) for consistent stop reason tracking.
*/
public enum StopReason {

/**
* Processing completed normally with work done, or still in progress.
*/
NONE,

/**
* No messages available in the SQS queue.
*/
QUEUE_EMPTY,

/**
* Messages exist in the queue but are too recent (less than deltaWindowSeconds old).
*/
MESSAGES_TOO_RECENT,

/**
* Hit the maximum messages per window limit.
*/
MESSAGE_LIMIT_EXCEEDED,

/**
* Pre-existing manual override was set (checked at job start).
*/
MANUAL_OVERRIDE_ACTIVE,

/**
* Circuit breaker triggered during processing (traffic spike detected).
*/
CIRCUIT_BREAKER_TRIGGERED
}

150 changes: 150 additions & 0 deletions src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package com.uid2.optout.sqs;

import com.uid2.optout.delta.StopReason;
import com.uid2.shared.optout.OptOutUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Applies parsing, validation, filtering, and deletion of corrupted SQS messages.
* Used by SqsWindowReader
*/
public class SqsBatchProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchProcessor.class);

private final SqsClient sqsClient;
private final String queueUrl;
private final int deltaWindowSeconds;

public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
this.deltaWindowSeconds = deltaWindowSeconds;
}

/**
* Result of processing a batch of (10) messages from SQS.
* Encapsulates eligible messages and the reason for stopping (if any).
*/
public static class BatchProcessingResult {
private final List<SqsParsedMessage> eligibleMessages;
private final StopReason stopReason;

private BatchProcessingResult(List<SqsParsedMessage> eligibleMessages, StopReason stopReason) {
this.eligibleMessages = eligibleMessages;
this.stopReason = stopReason;
}

public static BatchProcessingResult withMessages(List<SqsParsedMessage> messages) {
return new BatchProcessingResult(messages, StopReason.NONE);
}

public static BatchProcessingResult messagesTooRecent() {
return new BatchProcessingResult(List.of(), StopReason.MESSAGES_TOO_RECENT);
}

public static BatchProcessingResult corruptMessagesDeleted() {
return new BatchProcessingResult(List.of(), StopReason.NONE);
}

public boolean hasMessages() {
return !eligibleMessages.isEmpty();
}

public StopReason getStopReason() {
return stopReason;
}

public List<SqsParsedMessage> getMessages() {
return eligibleMessages;
}
}

/**
* Processes a batch of messages: parses, validates, cleans up invalid messages,
* and filters for eligible messages based on age threshold (message is older than deltaWindowSeconds)
*
* @param messageBatch Raw messages from SQS
* @param batchNumber The batch number (for logging)
* @return BatchProcessingResult containing eligible messages and processing metadata
*/
public BatchProcessingResult processBatch(List<Message> messageBatch, int batchNumber) {
// Parse and sort messages by timestamp
List<SqsParsedMessage> parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch);

// Identify and delete corrupt messages
if (parsedBatch.size() < messageBatch.size()) {
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
if (!invalidMessages.isEmpty()) {
LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber);
SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages);
}
}

// No valid messages after deleting corrupt ones, continue reading
if (parsedBatch.isEmpty()) {
LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber);
return BatchProcessingResult.corruptMessagesDeleted();
}

// Check if the oldest message in this batch is too recent
long currentTime = OptOutUtils.nowEpochSeconds();
SqsParsedMessage oldestMessage = parsedBatch.get(0);

if (!isMessageEligible(oldestMessage, currentTime)) {
return BatchProcessingResult.messagesTooRecent();
}

// Filter for eligible messages (>= deltaWindowSeconds old)
List<SqsParsedMessage> eligibleMessages = filterEligibleMessages(parsedBatch, currentTime);

return BatchProcessingResult.withMessages(eligibleMessages);
}

/**
* Checks if a message is old enough to be processed.
*
* @param message The parsed message to check
* @param currentTime Current time in epoch seconds
* @return true if the message is at least deltaWindowSeconds old
*/
private boolean isMessageEligible(SqsParsedMessage message, long currentTime) {
return currentTime - message.getTimestamp() >= this.deltaWindowSeconds;
}

/**
* Filters messages to only include those where sufficient time has elapsed.
*
* @param messages List of parsed messages
* @param currentTime Current time in seconds
* @return List of messages that meet the time threshold
*/
List<SqsParsedMessage> filterEligibleMessages(List<SqsParsedMessage> messages, long currentTime) {
return messages.stream()
.filter(msg -> isMessageEligible(msg, currentTime))
.collect(Collectors.toList());
}

/**
* Identifies messages that failed to parse by comparing the original batch with parsed results.
*
* @param originalBatch The original list of messages from SQS
* @param parsedBatch The list of successfully parsed messages
* @return List of messages that failed to parse
*/
private List<Message> identifyInvalidMessages(List<Message> originalBatch, List<SqsParsedMessage> parsedBatch) {
Set<String> validIds = parsedBatch.stream()
.map(p -> p.getOriginalMessage().messageId())
.collect(Collectors.toSet());

return originalBatch.stream()
.filter(msg -> !validIds.contains(msg.messageId()))
.collect(Collectors.toList());
}
}
193 changes: 193 additions & 0 deletions src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.uid2.optout.sqs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Utility class for SQS message operations.
*/
public class SqsMessageOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageOperations.class);
private static final int SQS_MAX_DELETE_BATCH_SIZE = 10;

/**
* Result of getting queue attributes from SQS.
*/
public static class QueueAttributes {
private final int approximateNumberOfMessages;
private final int approximateNumberOfMessagesNotVisible;
private final int approximateNumberOfMessagesDelayed;

public QueueAttributes(int approximateNumberOfMessages,
int approximateNumberOfMessagesNotVisible,
int approximateNumberOfMessagesDelayed) {
this.approximateNumberOfMessages = approximateNumberOfMessages;
this.approximateNumberOfMessagesNotVisible = approximateNumberOfMessagesNotVisible;
this.approximateNumberOfMessagesDelayed = approximateNumberOfMessagesDelayed;
}

/** Number of messages available for retrieval from the queue (visible messages) */
public int getApproximateNumberOfMessages() {
return approximateNumberOfMessages;
}

/** Number of messages that are in flight (being processed by consumers, invisible) */
public int getApproximateNumberOfMessagesNotVisible() {
return approximateNumberOfMessagesNotVisible;
}

/** Number of messages in the queue that are delayed and not available yet */
public int getApproximateNumberOfMessagesDelayed() {
return approximateNumberOfMessagesDelayed;
}

/** Total messages in queue = visible + invisible + delayed */
public int getTotalMessages() {
return approximateNumberOfMessages + approximateNumberOfMessagesNotVisible + approximateNumberOfMessagesDelayed;
}

@Override
public String toString() {
return String.format("QueueAttributes{visible=%d, invisible=%d, delayed=%d, total=%d}",
approximateNumberOfMessages, approximateNumberOfMessagesNotVisible,
approximateNumberOfMessagesDelayed, getTotalMessages());
}
}

/**
* Gets queue attributes from SQS including message counts.
*
* @param sqsClient The SQS client
* @param queueUrl The queue URL
* @return QueueAttributes with message counts, or null if failed
*/
public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String queueUrl) {
try {
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES,
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE,
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED
)
.build();

GetQueueAttributesResponse response = sqsClient.getQueueAttributes(request);
Map<QueueAttributeName, String> attrs = response.attributes();

int visible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES), 0);
int invisible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE), 0);
int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0);

QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed);
LOGGER.info("queue attributes: {}", queueAttributes);
return queueAttributes;

} catch (Exception e) {
LOGGER.info("error getting queue attributes", e);
return null;
}
}

private static int parseIntOrDefault(String value, int defaultValue) {
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return defaultValue;
}
}

/**
* Receives a batch of messages from SQS.
*
* @param sqsClient The SQS client
* @param queueUrl The queue URL
* @param maxMessages Maximum number of messages to receive (max 10)
* @param visibilityTimeout Visibility timeout in seconds
* @return List of received messages
*/
public static List<Message> receiveMessagesFromSqs(
SqsClient sqsClient,
String queueUrl,
int maxMessages,
int visibilityTimeout) {

try {
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(maxMessages)
.visibilityTimeout(visibilityTimeout)
.waitTimeSeconds(0) // Non-blocking poll
.messageSystemAttributeNames(MessageSystemAttributeName.SENT_TIMESTAMP) // Request SQS system timestamp
.build();

ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);

LOGGER.info("received {} messages", response.messages().size());
return response.messages();

} catch (Exception e) {
LOGGER.error("sqs_error: failed to receive messages", e);
return new ArrayList<>();
}
}

/**
* Deletes messages from SQS in batches (max 10 per batch).
*
* @param sqsClient The SQS client
* @param queueUrl The queue URL
* @param messages Messages to delete
*/
public static void deleteMessagesFromSqs(SqsClient sqsClient, String queueUrl, List<Message> messages) {
if (messages.isEmpty()) {
return;
}

try {
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
int batchId = 0;
int totalDeleted = 0;

for (Message msg : messages) {
entries.add(DeleteMessageBatchRequestEntry.builder()
.id(String.valueOf(batchId++))
.receiptHandle(msg.receiptHandle())
.build());

// Send batch when we reach 10 messages or at the end
if (entries.size() == SQS_MAX_DELETE_BATCH_SIZE || batchId == messages.size()) {
DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build();

DeleteMessageBatchResponse deleteResponse = sqsClient.deleteMessageBatch(deleteRequest);

if (!deleteResponse.failed().isEmpty()) {
LOGGER.error("sqs_error: failed to delete {} messages", deleteResponse.failed().size());
} else {
totalDeleted += entries.size();
}

entries.clear();
}
}

LOGGER.info("deleted {} messages", totalDeleted);

} catch (Exception e) {
LOGGER.error("sqs_error: exception during message deletion", e);
}
}
}

Loading
Loading