Skip to content
39 changes: 39 additions & 0 deletions src/main/java/com/uid2/optout/delta/StopReason.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.uid2.optout.delta;

/**
* Represents why delta production stopped.
* Used across all layers (batch, window, orchestrator) for consistent stop reason tracking.
*/
public enum StopReason {

/**
* Processing completed normally with work done, or still in progress.
*/
NONE,

/**
* No messages available in the SQS queue.
*/
QUEUE_EMPTY,

/**
* Messages exist in the queue but are too recent (less than deltaWindowSeconds old).
*/
MESSAGES_TOO_RECENT,

/**
* Hit the maximum messages per window limit.
*/
MESSAGE_LIMIT_EXCEEDED,

/**
* Pre-existing manual override was set (checked at job start).
*/
MANUAL_OVERRIDE_ACTIVE,

/**
* Circuit breaker triggered during processing (traffic spike detected).
*/
CIRCUIT_BREAKER_TRIGGERED
}

Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.uid2.optout.vertx;
package com.uid2.optout.sqs;

import com.uid2.optout.delta.StopReason;
import com.uid2.shared.optout.OptOutUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Applies parsing, validation, filtering, and deletion of corrupted SQS messages.
Expand All @@ -29,46 +29,46 @@ public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSe
}

/**
* Result of processing a batch of messages from SQS.
* Encapsulates eligible messages and metadata about the processing.
* Result of processing a batch of (10) messages from SQS.
* Encapsulates eligible messages and the reason for stopping (if any).
*/
public static class BatchProcessingResult {
private final List<SqsParsedMessage> eligibleMessages;
private final boolean shouldStopProcessing;
private final StopReason stopReason;

private BatchProcessingResult(List<SqsParsedMessage> eligibleMessages, boolean shouldStopProcessing) {
private BatchProcessingResult(List<SqsParsedMessage> eligibleMessages, StopReason stopReason) {
this.eligibleMessages = eligibleMessages;
this.shouldStopProcessing = shouldStopProcessing;
this.stopReason = stopReason;
}

public static BatchProcessingResult withEligibleMessages(List<SqsParsedMessage> messages) {
return new BatchProcessingResult(messages, false);
public static BatchProcessingResult withMessages(List<SqsParsedMessage> messages) {
return new BatchProcessingResult(messages, StopReason.NONE);
}

public static BatchProcessingResult stopProcessing() {
return new BatchProcessingResult(new ArrayList<>(), true);
public static BatchProcessingResult messagesTooRecent() {
return new BatchProcessingResult(List.of(), StopReason.MESSAGES_TOO_RECENT);
}

public static BatchProcessingResult empty() {
return new BatchProcessingResult(new ArrayList<>(), false);
public static BatchProcessingResult corruptMessagesDeleted() {
return new BatchProcessingResult(List.of(), StopReason.NONE);
}

public boolean isEmpty() {
return eligibleMessages.isEmpty();
public boolean hasMessages() {
return !eligibleMessages.isEmpty();
}

public boolean shouldStopProcessing() {
return shouldStopProcessing;
public StopReason getStopReason() {
return stopReason;
}

public List<SqsParsedMessage> getEligibleMessages() {
public List<SqsParsedMessage> getMessages() {
return eligibleMessages;
}
}

/**
* Processes a batch of messages: parses, validates, cleans up invalid messages,
* and filters for eligible messages based on age threshold (message is less than 5 minutes old)
* and filters for eligible messages based on age threshold (message is older than deltaWindowSeconds)
*
* @param messageBatch Raw messages from SQS
* @param batchNumber The batch number (for logging)
Expand All @@ -82,59 +82,53 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
if (parsedBatch.size() < messageBatch.size()) {
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
if (!invalidMessages.isEmpty()) {
LOGGER.error("Found {} invalid messages in batch {} (failed parsing). Deleting from queue.",
invalidMessages.size(), batchNumber);
LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber);
SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages);
}
}

// If no valid messages, return empty result
// No valid messages after deleting corrupt ones, continue reading
if (parsedBatch.isEmpty()) {
LOGGER.warn("No valid messages in batch {} (all failed parsing)", batchNumber);
return BatchProcessingResult.empty();
LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber);
return BatchProcessingResult.corruptMessagesDeleted();
}

// Check if the oldest message in this batch is too recent
long currentTime = OptOutUtils.nowEpochSeconds();
SqsParsedMessage oldestMessage = parsedBatch.get(0);
long messageAge = currentTime - oldestMessage.getTimestamp();

if (messageAge < this.deltaWindowSeconds) {
// Signal to stop processing - messages are too recent
return BatchProcessingResult.stopProcessing();
if (!isMessageEligible(oldestMessage, currentTime)) {
return BatchProcessingResult.messagesTooRecent();
}

// Filter for eligible messages (>= 5 minutes old)
// Filter for eligible messages (>= deltaWindowSeconds old)
List<SqsParsedMessage> eligibleMessages = filterEligibleMessages(parsedBatch, currentTime);

if (eligibleMessages.isEmpty()) {
LOGGER.debug("No eligible messages in batch {} (all too recent)", batchNumber);
return BatchProcessingResult.empty();
}
return BatchProcessingResult.withMessages(eligibleMessages);
}

return BatchProcessingResult.withEligibleMessages(eligibleMessages);
/**
* Checks if a message is old enough to be processed.
*
* @param message The parsed message to check
* @param currentTime Current time in epoch seconds
* @return true if the message is at least deltaWindowSeconds old
*/
private boolean isMessageEligible(SqsParsedMessage message, long currentTime) {
return currentTime - message.timestamp() >= this.deltaWindowSeconds;
}

/**
* Filters messages to only include those where sufficient time has elapsed.
. *
*
* @param messages List of parsed messages
* @param currentTime Current time in seconds
* @return List of messages that meet the time threshold
*/
public List<SqsParsedMessage> filterEligibleMessages(
List<SqsParsedMessage> messages,
long currentTime) {

List<SqsParsedMessage> eligibleMessages = new ArrayList<>();

for (SqsParsedMessage pm : messages) {
if (currentTime - pm.getTimestamp() >= this.deltaWindowSeconds) {
eligibleMessages.add(pm);
}
}

return eligibleMessages;
List<SqsParsedMessage> filterEligibleMessages(List<SqsParsedMessage> messages, long currentTime) {
return messages.stream()
.filter(msg -> isMessageEligible(msg, currentTime))
.collect(Collectors.toList());
}

/**
Expand All @@ -145,21 +139,12 @@ public List<SqsParsedMessage> filterEligibleMessages(
* @return List of messages that failed to parse
*/
private List<Message> identifyInvalidMessages(List<Message> originalBatch, List<SqsParsedMessage> parsedBatch) {
// Create a set of message IDs from successfully parsed messages
Set<String> validMessageIds = new HashSet<>();
for (SqsParsedMessage parsed : parsedBatch) {
validMessageIds.add(parsed.getOriginalMessage().messageId());
}
Set<String> validIds = parsedBatch.stream()
.map(p -> p.originalMessage().messageId())
.collect(Collectors.toSet());

// Find messages that were not successfully parsed
List<Message> invalidMessages = new ArrayList<>();
for (Message msg : originalBatch) {
if (!validMessageIds.contains(msg.messageId())) {
invalidMessages.add(msg);
}
}

return invalidMessages;
return originalBatch.stream()
.filter(msg -> !validIds.contains(msg.messageId()))
.collect(Collectors.toList());
}
}

Loading
Loading