Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e1da3fc
traffic components
Ian-Nara Dec 9, 2025
d6f6d16
addressing PR comments
Ian-Nara Dec 9, 2025
193ab19
update tests
Ian-Nara Dec 9, 2025
8acd1a8
clean up files
Ian-Nara Dec 9, 2025
d9e2641
test fix
Ian-Nara Dec 9, 2025
3aa5c0d
Merge branch 'ian-UID2-6345-sqs-components' into ian-UID2-6345-traffi…
Ian-Nara Dec 9, 2025
609c347
clean up files
Ian-Nara Dec 9, 2025
950af13
try make git render file recreated
Ian-Nara Dec 9, 2025
7c49222
try make git render file recreated
Ian-Nara Dec 9, 2025
6922b0f
create empty
Ian-Nara Dec 9, 2025
52e7818
create empty
Ian-Nara Dec 9, 2025
0eb06a2
file rename
Ian-Nara Dec 10, 2025
952dcf3
file update
Ian-Nara Dec 10, 2025
caf3d0e
file update
Ian-Nara Dec 10, 2025
813cd06
file updates
Ian-Nara Dec 10, 2025
e02e4f2
update comments
Ian-Nara Dec 10, 2025
0979fc1
Merge branch 'main' into ian-UID2-6345-traffic-components
Ian-Nara Dec 10, 2025
49660be
rename sum to totalRecords
Ian-Nara Dec 10, 2025
cecc3e2
refactor traffic calculator to use Parsed message to avoid duplicate …
Ian-Nara Dec 10, 2025
844332d
use stream to find oldest queue timestamp
Ian-Nara Dec 11, 2025
f8a3d03
rename sumCurrent to recentTrafficTotal, remove dead if condition
Ian-Nara Dec 11, 2025
4e22c34
consolidate warning logs for high message volume, add null check for …
Ian-Nara Dec 11, 2025
11af1ed
update log level for circuit_breaker_triggered. Change ipAddresses fr…
Ian-Nara Dec 11, 2025
8d6cd71
update trafficfiltertest to use Junit5 not Junit4
Ian-Nara Dec 11, 2025
836cd99
remove unused imports
Ian-Nara Dec 11, 2025
ac022fb
TrafficFilter minor efficiency improvements, usee stream to parse IPs…
Ian-Nara Dec 11, 2025
7725d44
TrafficCalculator: remove redundant checks
Ian-Nara Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/main/java/com/uid2/optout/sqs/SqsMessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ public static List<SqsParsedMessage> parseAndSortMessages(List<Message> messages
* @param message The SQS message
* @return Timestamp in seconds
*/
private static long extractTimestamp(Message message, String traceId) {
public static long extractTimestamp(Message message, String traceId) {
String sentTimestampStr = message.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP);
if (sentTimestampStr == null) {
LOGGER.info("message missing SentTimestamp, using current time instead, messageId={}, traceId={}", message.messageId(), traceId);
LOGGER.error("sqs_error: message missing SentTimestamp, using current time instead, messageId={}, traceId={}", message.messageId(), traceId);
return OptOutUtils.nowEpochSeconds();
}
try {
return Long.parseLong(sentTimestampStr) / 1000;
} catch (NumberFormatException e) {
LOGGER.error("sqs_error: invalid SentTimestamp, using current time instead, messageId={}, traceId={}, sentTimestamp={}", message.messageId(), traceId, sentTimestampStr);
return OptOutUtils.nowEpochSeconds();
}
return Long.parseLong(sentTimestampStr) / 1000; // ms to seconds
}
}

602 changes: 602 additions & 0 deletions src/main/java/com/uid2/optout/traffic/TrafficCalculator.java

Large diffs are not rendered by default.

177 changes: 177 additions & 0 deletions src/main/java/com/uid2/optout/traffic/TrafficFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package com.uid2.optout.traffic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.uid2.optout.sqs.SqsParsedMessage;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.charset.StandardCharsets;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

public class TrafficFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(TrafficFilter.class);

private final String trafficFilterConfigPath;
List<TrafficFilterRule> filterRules;

/**
* Traffic filter rule defining a time range and a list of IP addresses to exclude
*/
private static class TrafficFilterRule {
private final long rangeStart;
private final long rangeEnd;
private final Set<String> ipAddresses;

TrafficFilterRule(long rangeStart, long rangeEnd, Set<String> ipAddresses) {
this.rangeStart = rangeStart;
this.rangeEnd = rangeEnd;
this.ipAddresses = ipAddresses;
}

public long getRangeStart() {
return rangeStart;
}
public long getRangeEnd() {
return rangeEnd;
}
public Set<String> getIpAddresses() {
return ipAddresses;
}
}

public static class MalformedTrafficFilterConfigException extends Exception {
public MalformedTrafficFilterConfigException(String message) {
super(message);
}
}

/**
* Constructor for OptOutTrafficFilter
*
* @param trafficFilterConfigPath S3 path for traffic filter config
* @throws MalformedTrafficFilterConfigException if the traffic filter config is invalid
*/
public TrafficFilter(String trafficFilterConfigPath) throws MalformedTrafficFilterConfigException {
this.trafficFilterConfigPath = trafficFilterConfigPath;
// Initial filter rules load
this.filterRules = Collections.emptyList(); // start empty
reloadTrafficFilterConfig(); // load ConfigMap

LOGGER.info("initialized: filterRules={}", filterRules.size());
}

/**
* Reload traffic filter config from ConfigMap.
* Expected format:
* {
* "denylist_requests": [
* {range: [startTimestamp, endTimestamp], IPs: ["ip1"]},
* {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip2"]},
* {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip3"]},
* ]
* }
*
* Can be called periodically to pick up config changes without restarting.
*/
public void reloadTrafficFilterConfig() throws MalformedTrafficFilterConfigException {
LOGGER.info("loading traffic filter config");
try (InputStream is = Files.newInputStream(Paths.get(trafficFilterConfigPath))) {
String content = new String(is.readAllBytes(), StandardCharsets.UTF_8);
JsonObject filterConfigJson = new JsonObject(content);

this.filterRules = parseFilterRules(filterConfigJson);

LOGGER.info("loaded traffic filter config: filterRules={}", filterRules.size());

} catch (Exception e) {
LOGGER.error("circuit_breaker_config_error: no traffic filter config found at {}", trafficFilterConfigPath, e);
throw new MalformedTrafficFilterConfigException(e.getMessage());
}
}

/**
* Parse request filtering rules from JSON config
*/
List<TrafficFilterRule> parseFilterRules(JsonObject config) throws MalformedTrafficFilterConfigException {
List<TrafficFilterRule> rules = new ArrayList<>();
try {
JsonArray denylistRequests = config.getJsonArray("denylist_requests");
if (denylistRequests == null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so config should at least have an empty list ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, just to avoid key typos when configuring

LOGGER.error("circuit_breaker_config_error: denylist_requests is null");
throw new MalformedTrafficFilterConfigException("invalid traffic filter config: denylist_requests is null");
}
for (int i = 0; i < denylistRequests.size(); i++) {
JsonObject ruleJson = denylistRequests.getJsonObject(i);

// parse and validate range
var rangeJson = ruleJson.getJsonArray("range");
if (rangeJson == null || rangeJson.size() != 2) {
LOGGER.error("circuit_breaker_config_error: rule range is not 2 elements, rule={}", ruleJson.encode());
throw new MalformedTrafficFilterConfigException("invalid traffic filter rule: range is not 2 elements");
}

long start = rangeJson.getLong(0);
long end = rangeJson.getLong(1);

if (start >= end) {
LOGGER.error("circuit_breaker_config_error: rule range start must be less than end, rule={}", ruleJson.encode());
throw new MalformedTrafficFilterConfigException("invalid traffic filter rule: range start must be less than end");
}

if (end - start > 86400) {
LOGGER.error("circuit_breaker_config_error: rule range must be 24 hours or less, rule={}", ruleJson.encode());
throw new MalformedTrafficFilterConfigException("invalid traffic filter rule: range must be 24 hours or less");
}

// parse IPs using stream
var ipAddressesJson = ruleJson.getJsonArray("IPs");
if (ipAddressesJson == null || ipAddressesJson.isEmpty()) {
LOGGER.error("circuit_breaker_config_error: rule IPs is empty, rule={}", ruleJson.encode());
throw new MalformedTrafficFilterConfigException("invalid traffic filter rule: IPs is empty");
}

Set<String> ipAddresses = ipAddressesJson.stream()
.map(Object::toString)
.collect(Collectors.toSet());

TrafficFilterRule rule = new TrafficFilterRule(start, end, ipAddresses);

LOGGER.info("loaded traffic filter rule: range=[{}, {}], IPs={}", rule.getRangeStart(), rule.getRangeEnd(), rule.getIpAddresses());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this log all the IP addresses or just the size of the list ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would log the IP addresses we configured. Do you think we should avoid this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one hand, this could flood the logs if the list is too big, on the other hand it would be useful for debugging. Let's keep it for now.

rules.add(rule);
}
return rules;
} catch (Exception e) {
LOGGER.error("circuit_breaker_config_error: failed to parse rules, config={}, error={}", config.encode(), e.getMessage());
throw new MalformedTrafficFilterConfigException(e.getMessage());
}
}

public boolean isDenylisted(SqsParsedMessage message) {
long timestamp = message.timestamp();
String clientIp = message.clientIp();

if (clientIp == null || clientIp.isEmpty()) {
LOGGER.error("sqs_error: request does not contain client ip, messageId={}", message.originalMessage().messageId());
return false;
}

for (TrafficFilterRule rule : filterRules) {
if(timestamp >= rule.getRangeStart() && timestamp <= rule.getRangeEnd()) {
if(rule.getIpAddresses().contains(clientIp)) {
return true;
}
}
}
return false;
}

}
Loading
Loading