Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e1da3fc
traffic components
Ian-Nara Dec 9, 2025
d6f6d16
addressing PR comments
Ian-Nara Dec 9, 2025
193ab19
update tests
Ian-Nara Dec 9, 2025
8acd1a8
clean up files
Ian-Nara Dec 9, 2025
d9e2641
test fix
Ian-Nara Dec 9, 2025
3aa5c0d
Merge branch 'ian-UID2-6345-sqs-components' into ian-UID2-6345-traffi…
Ian-Nara Dec 9, 2025
609c347
clean up files
Ian-Nara Dec 9, 2025
950af13
try make git render file recreated
Ian-Nara Dec 9, 2025
7c49222
try make git render file recreated
Ian-Nara Dec 9, 2025
6922b0f
create empty
Ian-Nara Dec 9, 2025
52e7818
create empty
Ian-Nara Dec 9, 2025
0eb06a2
file rename
Ian-Nara Dec 10, 2025
952dcf3
file update
Ian-Nara Dec 10, 2025
caf3d0e
file update
Ian-Nara Dec 10, 2025
813cd06
file updates
Ian-Nara Dec 10, 2025
e02e4f2
update comments
Ian-Nara Dec 10, 2025
0979fc1
Merge branch 'main' into ian-UID2-6345-traffic-components
Ian-Nara Dec 10, 2025
49660be
rename sum to totalRecords
Ian-Nara Dec 10, 2025
cecc3e2
refactor traffic calculator to use Parsed message to avoid duplicate …
Ian-Nara Dec 10, 2025
844332d
use stream to find oldest queue timestamp
Ian-Nara Dec 11, 2025
f8a3d03
rename sumCurrent to recentTrafficTotal, remove dead if condition
Ian-Nara Dec 11, 2025
4e22c34
consolidate warning logs for high message volume, add null check for …
Ian-Nara Dec 11, 2025
11af1ed
update log level for circuit_breaker_triggered. Change ipAddresses fr…
Ian-Nara Dec 11, 2025
8d6cd71
update trafficfiltertest to use Junit5 not Junit4
Ian-Nara Dec 11, 2025
836cd99
remove unused imports
Ian-Nara Dec 11, 2025
ac022fb
TrafficFilter minor efficiency improvements, usee stream to parse IPs…
Ian-Nara Dec 11, 2025
7725d44
TrafficCalculator: remove redundant checks
Ian-Nara Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
* @return true if the message is at least deltaWindowSeconds old
*/
private boolean isMessageEligible(SqsParsedMessage message, long currentTime) {
return currentTime - message.getTimestamp() >= this.deltaWindowSeconds;
return currentTime - message.timestamp() >= this.deltaWindowSeconds;
}

/**
Expand All @@ -140,7 +140,7 @@ List<SqsParsedMessage> filterEligibleMessages(List<SqsParsedMessage> messages, l
*/
private List<Message> identifyInvalidMessages(List<Message> originalBatch, List<SqsParsedMessage> parsedBatch) {
Set<String> validIds = parsedBatch.stream()
.map(p -> p.getOriginalMessage().messageId())
.map(p -> p.originalMessage().messageId())
.collect(Collectors.toSet());

return originalBatch.stream()
Expand Down
72 changes: 46 additions & 26 deletions src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Utility class for SQS message operations.
Expand Down Expand Up @@ -90,7 +91,7 @@ public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String que
return queueAttributes;

} catch (Exception e) {
LOGGER.info("error getting queue attributes", e);
LOGGER.info("sqs_error: error getting queue attributes", e);
return null;
}
}
Expand Down Expand Up @@ -126,8 +127,8 @@ public static List<Message> receiveMessagesFromSqs(
.queueUrl(queueUrl)
.maxNumberOfMessages(maxMessages)
.visibilityTimeout(visibilityTimeout)
.waitTimeSeconds(0) // Non-blocking poll
.messageSystemAttributeNames(MessageSystemAttributeName.SENT_TIMESTAMP) // Request SQS system timestamp
.waitTimeSeconds(0) // non-blocking poll
.messageSystemAttributeNames(MessageSystemAttributeName.SENT_TIMESTAMP) // request sqs system timestamp
.build();

ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
Expand All @@ -143,6 +144,7 @@ public static List<Message> receiveMessagesFromSqs(

/**
* Deletes messages from SQS in batches (max 10 per batch).
* Retries failed deletes as long as progress is being made.
*
* @param sqsClient The SQS client
* @param queueUrl The queue URL
Expand All @@ -154,40 +156,58 @@ public static void deleteMessagesFromSqs(SqsClient sqsClient, String queueUrl, L
}

try {
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
int batchId = 0;
int totalDeleted = 0;
List<DeleteMessageBatchRequestEntry> batch = new ArrayList<>();

for (Message msg : messages) {
entries.add(DeleteMessageBatchRequestEntry.builder()
.id(String.valueOf(batchId++))
.receiptHandle(msg.receiptHandle())
for (int i = 0; i < messages.size(); i++) {
batch.add(DeleteMessageBatchRequestEntry.builder()
.id(String.valueOf(i))
.receiptHandle(messages.get(i).receiptHandle())
.build());

// Send batch when we reach 10 messages or at the end
if (entries.size() == SQS_MAX_DELETE_BATCH_SIZE || batchId == messages.size()) {
DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build();
// send batch when we reach 10 messages or end of list
if (batch.size() == SQS_MAX_DELETE_BATCH_SIZE || i == messages.size() - 1) {
totalDeleted += deleteBatchWithRetry(sqsClient, queueUrl, batch);
batch.clear();
}
}

DeleteMessageBatchResponse deleteResponse = sqsClient.deleteMessageBatch(deleteRequest);
LOGGER.info("deleted {} messages", totalDeleted);
} catch (Exception e) {
LOGGER.error("sqs_error: error deleting messages", e);
}
}

if (!deleteResponse.failed().isEmpty()) {
LOGGER.error("sqs_error: failed to delete {} messages", deleteResponse.failed().size());
} else {
totalDeleted += entries.size();
}
/** Deletes batch, retrying failed entries. Retries once unconditionally, then only while making progress. */
private static int deleteBatchWithRetry(SqsClient sqsClient, String queueUrl, List<DeleteMessageBatchRequestEntry> entries) {
int deleted = 0;
List<DeleteMessageBatchRequestEntry> toDelete = entries;
boolean retriedOnce = false;

entries.clear();
}
while (!toDelete.isEmpty()) {
DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(
DeleteMessageBatchRequest.builder().queueUrl(queueUrl).entries(toDelete).build());

int succeeded = response.successful().size();
deleted += succeeded;

if (response.failed().isEmpty()) {
break; // all done
}

LOGGER.info("deleted {} messages", totalDeleted);
// retry once unconditionally, then only if making progress
if (retriedOnce && succeeded == 0) {
LOGGER.error("sqs_error: {} deletes failed with no progress", response.failed().size());
break;
}
retriedOnce = true;

} catch (Exception e) {
LOGGER.error("sqs_error: exception during message deletion", e);
// retry deletion on the failed messages only
var failedIds = response.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet());
toDelete = toDelete.stream().filter(e -> failedIds.contains(e.id())).toList();
}

return deleted;
}
}

30 changes: 17 additions & 13 deletions src/main/java/com/uid2/optout/sqs/SqsMessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,47 @@ public class SqsMessageParser {
* @return List of parsed messages sorted by timestamp (oldest first)
*/
public static List<SqsParsedMessage> parseAndSortMessages(List<Message> messages) {
List<SqsParsedMessage> parsedMessages = new ArrayList<>();
List<SqsParsedMessage> parsedMessages = new ArrayList<>(messages.size());

for (Message message : messages) {
try {
// Extract SQS system timestamp (in milliseconds), or use current time as fallback
long timestampSeconds = extractTimestamp(message);

// Parse message body
String traceId = null;

try {
// parse message body
JsonObject body = new JsonObject(message.body());
traceId = body.getString("trace_id");

String identityHash = body.getString("identity_hash");
String advertisingId = body.getString("advertising_id");
String traceId = body.getString("trace_id");
String clientIp = body.getString("client_ip");
String email = body.getString("email");
String phone = body.getString("phone");

// extract sqs system timestamp (in milliseconds), or use current time as fallback
long timestampSeconds = extractTimestamp(message, traceId);

if (identityHash == null || advertisingId == null) {
LOGGER.error("sqs_error: invalid message format: {}", message.body());
LOGGER.error("sqs_error: invalid message format, messageId={}, traceId={}", message.messageId(), traceId);
continue;
}

byte[] hashBytes = OptOutUtils.base64StringTobyteArray(identityHash);
byte[] idBytes = OptOutUtils.base64StringTobyteArray(advertisingId);

if (hashBytes == null || idBytes == null) {
LOGGER.error("sqs_error: invalid base64 encoding");
LOGGER.error("sqs_error: invalid base64 encoding, messageId={}, traceId={}", message.messageId(), traceId);
continue;
}

parsedMessages.add(new SqsParsedMessage(message, hashBytes, idBytes, timestampSeconds, email, phone, clientIp, traceId));
} catch (Exception e) {
LOGGER.error("sqs_error: error parsing message", e);
LOGGER.error("sqs_error: error parsing message, messageId={}, traceId={}", message.messageId(), traceId, e);
}
}

// Sort by timestamp
parsedMessages.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp()));
// sort by timestamp
parsedMessages.sort((a, b) -> Long.compare(a.timestamp(), b.timestamp()));

return parsedMessages;
}
Expand All @@ -70,10 +74,10 @@ public static List<SqsParsedMessage> parseAndSortMessages(List<Message> messages
* @param message The SQS message
* @return Timestamp in seconds
*/
private static long extractTimestamp(Message message) {
private static long extractTimestamp(Message message, String traceId) {
String sentTimestampStr = message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP);
if (sentTimestampStr == null) {
LOGGER.info("message missing SentTimestamp, using current time");
LOGGER.info("message missing SentTimestamp, using current time instead, messageId={}, traceId={}", message.messageId(), traceId);
return OptOutUtils.nowEpochSeconds();
}
return Long.parseLong(sentTimestampStr) / 1000; // ms to seconds
Expand Down
64 changes: 10 additions & 54 deletions src/main/java/com/uid2/optout/sqs/SqsParsedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,13 @@
/**
* Represents a parsed SQS message containing opt-out data.
*/
public class SqsParsedMessage {
private final Message originalMessage;
private final byte[] hashBytes;
private final byte[] idBytes;
private final long timestamp;
private final String email;
private final String phone;
private final String clientIp;
private final String traceId;

public SqsParsedMessage(Message originalMessage, byte[] hashBytes, byte[] idBytes, long timestamp, String email, String phone, String clientIp, String traceId) {
this.originalMessage = originalMessage;
this.hashBytes = hashBytes;
this.idBytes = idBytes;
this.timestamp = timestamp;
this.email = email;
this.phone = phone;
this.clientIp = clientIp;
this.traceId = traceId;
}

public Message getOriginalMessage() {
return originalMessage;
}

public byte[] getHashBytes() {
return hashBytes;
}

public byte[] getIdBytes() {
return idBytes;
}

public long getTimestamp() {
return timestamp;
}

public String getEmail() {
return email;
}

public String getPhone() {
return phone;
}

public String getClientIp() {
return clientIp;
}

public String getTraceId() {
return traceId;
}
}

public record SqsParsedMessage(
Message originalMessage,
byte[] hashBytes,
byte[] idBytes,
long timestamp,
String email,
String phone,
String clientIp,
String traceId
) {}
2 changes: 1 addition & 1 deletion src/main/java/com/uid2/optout/sqs/SqsWindowReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public WindowReadResult readWindow() {
// Add eligible messages to current window
boolean newWindow = false;
for (SqsParsedMessage msg : batchResult.getMessages()) {
long msgWindowStart = msg.getTimestamp();
long msgWindowStart = msg.timestamp();

// Discover start of window
if (currentWindowStart == 0) {
Expand Down
Loading
Loading