From ba183a51e49cedb886aa781f0cb390e6e5680909 Mon Sep 17 00:00:00 2001 From: Mahad Janjua Date: Fri, 29 Aug 2025 10:37:37 -0700 Subject: [PATCH] Add AWS X-Ray Adaptive Sampling Support --- aws-xray/build.gradle.kts | 2 + .../contrib/awsxray/AwsSamplingResult.java | 54 + .../AwsXrayAdaptiveSamplingConfig.java | 148 +++ .../contrib/awsxray/AwsXrayRemoteSampler.java | 71 +- .../awsxray/GetSamplingRulesResponse.java | 24 +- .../awsxray/GetSamplingTargetsRequest.java | 54 +- .../awsxray/GetSamplingTargetsResponse.java | 29 +- .../contrib/awsxray/SamplingRuleApplier.java | 174 +++- .../contrib/awsxray/XrayRulesSampler.java | 382 ++++++- .../awsxray/AwsXrayRemoteSamplerTest.java | 41 + .../awsxray/SamplingRuleApplierTest.java | 222 +++- .../contrib/awsxray/XrayRulesSamplerTest.java | 981 +++++++++++++++++- .../awsxray/XraySamplerClientTest.java | 6 +- .../test/resources/sampling-rule-boost.json | 22 + disk-buffering/build.gradle.kts | 4 + 15 files changed, 2131 insertions(+), 83 deletions(-) create mode 100644 aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java create mode 100644 aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java create mode 100644 aws-xray/src/test/resources/sampling-rule-boost.json diff --git a/aws-xray/build.gradle.kts b/aws-xray/build.gradle.kts index 54dabba71..d56b12bd2 100644 --- a/aws-xray/build.gradle.kts +++ b/aws-xray/build.gradle.kts @@ -11,6 +11,7 @@ dependencies { api("io.opentelemetry:opentelemetry-sdk-trace") compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") + implementation("io.opentelemetry.semconv:opentelemetry-semconv:1.32.0-alpha") implementation("com.squareup.okhttp3:okhttp") implementation("io.opentelemetry.semconv:opentelemetry-semconv") @@ -25,6 +26,7 @@ dependencies { implementation("com.fasterxml.jackson.core:jackson-core") implementation("com.fasterxml.jackson.core:jackson-databind") + implementation("com.github.ben-manes.caffeine:caffeine:2.9.3") testImplementation("com.linecorp.armeria:armeria-junit5") testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java new file mode 100644 index 000000000..41f22f903 --- /dev/null +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsSamplingResult.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.awsxray; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.sdk.trace.samplers.SamplingDecision; +import io.opentelemetry.sdk.trace.samplers.SamplingResult; + +final class AwsSamplingResult implements SamplingResult { + + // OTel trace state is a space shared with other vendors with a 256 character limit + // We keep the key and values as short as possible while still identifiable + public static final String AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY = "xrsr"; + + private final SamplingDecision decision; + private final Attributes attributes; + private final String samplingRuleName; + + private AwsSamplingResult( + SamplingDecision decision, Attributes attributes, String samplingRuleName) { + this.decision = decision; + this.attributes = attributes; + this.samplingRuleName = samplingRuleName; + } + + static AwsSamplingResult create( + SamplingDecision decision, Attributes attributes, String samplingRuleName) { + return new AwsSamplingResult(decision, attributes, samplingRuleName); + } + + @Override + public SamplingDecision getDecision() { + return decision; + } + + @Override + public Attributes getAttributes() { + return attributes; + } + + @Override + public TraceState getUpdatedTraceState(TraceState parentTraceState) { + if (parentTraceState.get(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY) == null) { + return parentTraceState.toBuilder() + .put(AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, samplingRuleName) + .build(); + } + return parentTraceState; + } +} diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java new file mode 100644 index 000000000..dc5b7a013 --- /dev/null +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayAdaptiveSamplingConfig.java @@ -0,0 +1,148 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.awsxray; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.auto.value.AutoValue; +import java.util.List; +import javax.annotation.Nullable; + +@AutoValue +@JsonSerialize(as = AwsXrayAdaptiveSamplingConfig.class) +@JsonDeserialize(builder = AutoValue_AwsXrayAdaptiveSamplingConfig.Builder.class) +public abstract class AwsXrayAdaptiveSamplingConfig { + + @JsonProperty("version") + public abstract double getVersion(); + + @JsonProperty("anomalyConditions") + @Nullable + public abstract List getAnomalyConditions(); + + @JsonProperty("anomalyCaptureLimit") + @Nullable + public abstract AnomalyCaptureLimit getAnomalyCaptureLimit(); + + public static Builder builder() { + return new AutoValue_AwsXrayAdaptiveSamplingConfig.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + @JsonProperty("version") + public abstract Builder setVersion(double value); + + @JsonProperty("anomalyConditions") + public abstract Builder setAnomalyConditions(List value); + + @JsonProperty("anomalyCaptureLimit") + public abstract Builder setAnomalyCaptureLimit(AnomalyCaptureLimit value); + + public abstract AwsXrayAdaptiveSamplingConfig build(); + } + + @AutoValue + @JsonDeserialize( + builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder.class) + public abstract static class AnomalyConditions { + @JsonProperty("errorCodeRegex") + @Nullable + public abstract String getErrorCodeRegex(); + + @JsonProperty("operations") + @Nullable + public abstract List getOperations(); + + @JsonProperty("highLatencyMs") + @Nullable + public abstract Long getHighLatencyMs(); + + @JsonProperty("usage") + @Nullable + public abstract UsageType getUsage(); + + public static Builder builder() { + return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyConditions.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + @JsonProperty("errorCodeRegex") + public abstract Builder setErrorCodeRegex(String value); + + @JsonProperty("operations") + public abstract Builder setOperations(List value); + + @JsonProperty("highLatencyMs") + public abstract Builder setHighLatencyMs(Long value); + + @JsonProperty("usage") + public abstract Builder setUsage(UsageType value); + + public abstract AnomalyConditions build(); + } + } + + public enum UsageType { + BOTH("both"), + SAMPLING_BOOST("sampling-boost"), + ANOMALY_TRACE_CAPTURE("anomaly-trace-capture"), + NEITHER("neither"); // Not meant to be used by customers + + private final String value; + + UsageType(String value) { + this.value = value; + } + + @JsonValue + public String getValue() { + return value; + } + + @JsonCreator + public static UsageType fromValue(String value) { + for (UsageType type : values()) { + if (type.value.equals(value)) { + return type; + } + } + throw new IllegalArgumentException("Invalid usage value: " + value); + } + + public static boolean isUsedForBoost(UsageType usage) { + return BOTH.equals(usage) || SAMPLING_BOOST.equals(usage); + } + + public static boolean isUsedForAnomalyTraceCapture(UsageType usage) { + return BOTH.equals(usage) || ANOMALY_TRACE_CAPTURE.equals(usage); + } + } + + @AutoValue + @JsonDeserialize( + builder = AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder.class) + public abstract static class AnomalyCaptureLimit { + @JsonProperty("anomalyTracesPerSecond") + public abstract int getAnomalyTracesPerSecond(); + + public static Builder builder() { + return new AutoValue_AwsXrayAdaptiveSamplingConfig_AnomalyCaptureLimit.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + @JsonProperty("anomalyTracesPerSecond") + public abstract Builder setAnomalyTracesPerSecond(int value); + + public abstract AnomalyCaptureLimit build(); + } + } +} diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java index ad9b72a2c..7864f3588 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSampler.java @@ -9,16 +9,22 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRuleRecord; +import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; import java.io.Closeable; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -43,6 +49,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName()); + // Default batch size to be same as OTel BSP default + private static final int maxExportBatchSize = 512; + private final Resource resource; private final Clock clock; private final Sampler initialSampler; @@ -59,6 +68,9 @@ public final class AwsXrayRemoteSampler implements Sampler, Closeable { @Nullable private volatile XrayRulesSampler internalXrayRulesSampler; private volatile Sampler sampler; + @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; + @Nullable private BatchSpanProcessor bsp; + /** * Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link * Resource} should be the same as what the OpenTelemetry SDK is configured with. @@ -120,6 +132,40 @@ public String getDescription() { return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}"; } + public void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { + if (this.adaptiveSamplingConfig != null) { + throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); + } else if (config != null && this.adaptiveSamplingConfig == null) { + // Save here and also pass to XrayRulesSampler directly as it already exists + this.adaptiveSamplingConfig = config; + if (internalXrayRulesSampler != null) { + internalXrayRulesSampler.setAdaptiveSamplingConfig(config); + } + } + } + + public void setSpanExporter(SpanExporter spanExporter) { + if (this.bsp != null) { + throw new IllegalStateException("Programming bug - BatchSpanProcessor is already set"); + } else if (spanExporter != null && this.bsp == null) { + this.bsp = + BatchSpanProcessor.builder(spanExporter) + .setExportUnsampledSpans(true) // Required to capture the unsampled anomaly spans + .setMaxExportBatchSize(maxExportBatchSize) + .build(); + } + } + + public void adaptSampling(ReadableSpan span, SpanData spanData) { + if (this.bsp == null) { + throw new IllegalStateException( + "Programming bug - BatchSpanProcessor is null while trying to adapt sampling"); + } + if (internalXrayRulesSampler != null) { + internalXrayRulesSampler.adaptSampling(span, spanData, this.bsp::onEnd); + } + } + private void getAndUpdateSampler() { try { // No pagination support yet, or possibly ever. @@ -134,8 +180,8 @@ private void getAndUpdateSampler() { initialSampler, response.getSamplingRules().stream() .map(SamplingRuleRecord::getRule) - .collect(Collectors.toList()))); - + .collect(Collectors.toList()), + adaptiveSamplingConfig)); previousRulesResponse = response; ScheduledFuture existingFetchTargetsFuture = fetchTargetsFuture; if (existingFetchTargetsFuture != null) { @@ -179,14 +225,29 @@ private void fetchTargets() { XrayRulesSampler xrayRulesSampler = this.internalXrayRulesSampler; try { Date now = Date.from(Instant.ofEpochSecond(0, clock.now())); - List statistics = xrayRulesSampler.snapshot(now); + List statisticsSnapshot = + xrayRulesSampler.snapshot(now); + List statistics = new ArrayList(); + List boostStatistics = + new ArrayList(); + statisticsSnapshot.stream() + .forEach( + snapshot -> { + if (snapshot.getStatisticsDocument() != null) { + statistics.add(snapshot.getStatisticsDocument()); + } + if (snapshot.getBoostStatisticsDocument() != null + && snapshot.getBoostStatisticsDocument().getTotalCount() > 0) { + boostStatistics.add(snapshot.getBoostStatisticsDocument()); + } + }); Set requestedTargetRuleNames = statistics.stream() .map(SamplingStatisticsDocument::getRuleName) .collect(Collectors.toSet()); - GetSamplingTargetsResponse response = - client.getSamplingTargets(GetSamplingTargetsRequest.create(statistics)); + GetSamplingTargetsRequest req = GetSamplingTargetsRequest.create(statistics, boostStatistics); + GetSamplingTargetsResponse response = client.getSamplingTargets(req); Map targets = response.getDocuments().stream() .collect(Collectors.toMap(SamplingTargetDocument::getRuleName, Function.identity())); diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java index dca930d5c..01835dc2c 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingRulesResponse.java @@ -62,7 +62,8 @@ static SamplingRule create( @JsonProperty("ServiceName") String serviceName, @JsonProperty("ServiceType") String serviceType, @JsonProperty("URLPath") String urlPath, - @JsonProperty("Version") int version) { + @JsonProperty("Version") int version, + @JsonProperty("SamplingRateBoost") @Nullable SamplingRateBoost samplingRateBoost) { return new AutoValue_GetSamplingRulesResponse_SamplingRule( attributes, fixedRate, @@ -76,7 +77,8 @@ static SamplingRule create( serviceName, serviceType, urlPath, - version); + version, + samplingRateBoost); } abstract Map getAttributes(); @@ -106,5 +108,23 @@ static SamplingRule create( abstract String getUrlPath(); abstract int getVersion(); + + @Nullable + abstract SamplingRateBoost getSamplingRateBoost(); + } + + @AutoValue + abstract static class SamplingRateBoost { + @JsonCreator + static SamplingRateBoost create( + @JsonProperty("MaxRate") double maxRate, + @JsonProperty("CooldownWindowMinutes") long cooldownWindowMinutes) { + return new AutoValue_GetSamplingRulesResponse_SamplingRateBoost( + maxRate, cooldownWindowMinutes); + } + + abstract double getMaxRate(); + + abstract long getCooldownWindowMinutes(); } } diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java index 7d1fb7b78..9404f73ed 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsRequest.java @@ -15,14 +15,20 @@ @JsonSerialize(as = GetSamplingTargetsRequest.class) abstract class GetSamplingTargetsRequest { - static GetSamplingTargetsRequest create(List documents) { - return new AutoValue_GetSamplingTargetsRequest(documents); + static GetSamplingTargetsRequest create( + List documents, + List boostDocuments) { + return new AutoValue_GetSamplingTargetsRequest(documents, boostDocuments); } // Limit of 25 items @JsonProperty("SamplingStatisticsDocuments") abstract List getDocuments(); + // Limit of 25 items + @JsonProperty("SamplingBoostStatisticsDocuments") + abstract List getBoostDocuments(); + @AutoValue @JsonSerialize(as = SamplingStatisticsDocument.class) abstract static class SamplingStatisticsDocument { @@ -66,4 +72,48 @@ abstract static class Builder { abstract SamplingStatisticsDocument build(); } } + + @AutoValue + @JsonSerialize(as = SamplingBoostStatisticsDocument.class) + abstract static class SamplingBoostStatisticsDocument { + + static SamplingBoostStatisticsDocument.Builder newBuilder() { + return new AutoValue_GetSamplingTargetsRequest_SamplingBoostStatisticsDocument.Builder(); + } + + @JsonProperty("RuleName") + abstract String getRuleName(); + + @JsonProperty("ServiceName") + abstract String getServiceName(); + + @JsonProperty("Timestamp") + abstract Date getTimestamp(); + + @JsonProperty("AnomalyCount") + abstract long getAnomalyCount(); + + @JsonProperty("TotalCount") + abstract long getTotalCount(); + + @JsonProperty("SampledAnomalyCount") + abstract long getSampledAnomalyCount(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setRuleName(String ruleName); + + abstract Builder setServiceName(String serviceName); + + abstract Builder setTimestamp(Date timestamp); + + abstract Builder setAnomalyCount(long anomalyCount); + + abstract Builder setTotalCount(long totalCount); + + abstract Builder setSampledAnomalyCount(long sampledAnomalyCount); + + abstract SamplingBoostStatisticsDocument build(); + } + } } diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java index c1e178f54..406f07e2b 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/GetSamplingTargetsResponse.java @@ -19,9 +19,11 @@ abstract class GetSamplingTargetsResponse { static GetSamplingTargetsResponse create( @JsonProperty("LastRuleModification") Date lastRuleModification, @JsonProperty("SamplingTargetDocuments") List documents, - @JsonProperty("UnprocessedStatistics") List unprocessedStatistics) { + @JsonProperty("UnprocessedStatistics") List unprocessedStatistics, + @JsonProperty("UnprocessedBoostStatistics") @Nullable + List unprocessedBoostStatistics) { return new AutoValue_GetSamplingTargetsResponse( - lastRuleModification, documents, unprocessedStatistics); + lastRuleModification, documents, unprocessedStatistics, unprocessedBoostStatistics); } abstract Date getLastRuleModification(); @@ -30,6 +32,9 @@ static GetSamplingTargetsResponse create( abstract List getUnprocessedStatistics(); + @Nullable + abstract List getUnprocessedBoostStatistics(); + @AutoValue abstract static class SamplingTargetDocument { @@ -39,9 +44,10 @@ static SamplingTargetDocument create( @JsonProperty("Interval") @Nullable Integer intervalSecs, @JsonProperty("ReservoirQuota") @Nullable Integer reservoirQuota, @JsonProperty("ReservoirQuotaTTL") @Nullable Date reservoirQuotaTtl, + @JsonProperty("SamplingBoost") @Nullable SamplingBoost samplingBoost, @JsonProperty("RuleName") String ruleName) { return new AutoValue_GetSamplingTargetsResponse_SamplingTargetDocument( - fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, ruleName); + fixedRate, intervalSecs, reservoirQuota, reservoirQuotaTtl, samplingBoost, ruleName); } abstract double getFixedRate(); @@ -57,6 +63,9 @@ static SamplingTargetDocument create( @Nullable abstract Date getReservoirQuotaTtl(); + @Nullable + abstract SamplingBoost getSamplingBoost(); + abstract String getRuleName(); } @@ -78,4 +87,18 @@ static UnprocessedStatistics create( abstract String getRuleName(); } + + @AutoValue + abstract static class SamplingBoost { + @JsonCreator + static SamplingBoost create( + @JsonProperty("BoostRate") double boostRate, + @JsonProperty("BoostRateTTL") Date boostRateTtl) { + return new AutoValue_GetSamplingTargetsResponse_SamplingBoost(boostRate, boostRateTtl); + } + + abstract double getBoostRate(); + + abstract Date getBoostRateTtl(); + } } diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java index 1d97c4aed..6462c7f3e 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplier.java @@ -11,10 +11,13 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingBoostStatisticsDocument; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsRequest.SamplingStatisticsDocument; +import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; @@ -76,12 +79,20 @@ final class SamplingRuleApplier { private final String clientId; private final String ruleName; + private final String serviceName; private final Clock clock; private final Sampler reservoirSampler; private final long reservoirEndTimeNanos; + private final double fixedRate; private final Sampler fixedRateSampler; private final boolean borrowing; + // Adaptive sampling related configs + private final boolean hasBoost; + private final double boostedFixedRate; + private final Long boostEndTimeNanos; + private final Sampler boostedFixedRateSampler; + private final Map attributeMatchers; private final Matcher urlPathMatcher; private final Matcher serviceNameMatcher; @@ -94,7 +105,11 @@ final class SamplingRuleApplier { private final long nextSnapshotTimeNanos; - SamplingRuleApplier(String clientId, GetSamplingRulesResponse.SamplingRule rule, Clock clock) { + SamplingRuleApplier( + String clientId, + GetSamplingRulesResponse.SamplingRule rule, + @Nullable String serviceName, + Clock clock) { this.clientId = clientId; this.clock = clock; String ruleName = rule.getRuleName(); @@ -108,6 +123,8 @@ final class SamplingRuleApplier { } this.ruleName = ruleName; + this.serviceName = serviceName == null ? "default" : serviceName; + // We don't have a SamplingTarget so are ready to report a snapshot right away. nextSnapshotTimeNanos = clock.nanoTime(); @@ -124,7 +141,15 @@ final class SamplingRuleApplier { reservoirSampler = Sampler.alwaysOff(); borrowing = false; } - fixedRateSampler = createFixedRate(rule.getFixedRate()); + fixedRate = rule.getFixedRate(); + fixedRateSampler = createFixedRate(fixedRate); + + // Check if the rule has a sampling rate boost option + hasBoost = rule.getSamplingRateBoost() != null; + + boostedFixedRate = fixedRate; + boostedFixedRateSampler = createFixedRate(fixedRate); + boostEndTimeNanos = clock.nanoTime(); if (rule.getAttributes().isEmpty()) { attributeMatchers = Collections.emptyMap(); @@ -147,11 +172,16 @@ final class SamplingRuleApplier { private SamplingRuleApplier( String clientId, String ruleName, + String serviceName, Clock clock, Sampler reservoirSampler, long reservoirEndTimeNanos, + double fixedRate, Sampler fixedRateSampler, boolean borrowing, + double boostedFixedRate, + Long boostEndTimeNanos, + boolean hasBoost, Map attributeMatchers, Matcher urlPathMatcher, Matcher serviceNameMatcher, @@ -163,11 +193,16 @@ private SamplingRuleApplier( long nextSnapshotTimeNanos) { this.clientId = clientId; this.ruleName = ruleName; + this.serviceName = serviceName; this.clock = clock; this.reservoirSampler = reservoirSampler; this.reservoirEndTimeNanos = reservoirEndTimeNanos; + this.fixedRate = fixedRate; this.fixedRateSampler = fixedRateSampler; this.borrowing = borrowing; + this.boostedFixedRate = boostedFixedRate; + this.boostEndTimeNanos = boostEndTimeNanos; + this.hasBoost = hasBoost; this.attributeMatchers = attributeMatchers; this.urlPathMatcher = urlPathMatcher; this.serviceNameMatcher = serviceNameMatcher; @@ -177,6 +212,7 @@ private SamplingRuleApplier( this.resourceArnMatcher = resourceArnMatcher; this.statistics = statistics; this.nextSnapshotTimeNanos = nextSnapshotTimeNanos; + this.boostedFixedRateSampler = createFixedRate(this.boostedFixedRate); } @SuppressWarnings("deprecation") // TODO @@ -273,45 +309,84 @@ SamplingResult shouldSample( statistics.sampled.increment(); return result; } - result = - fixedRateSampler.shouldSample( - parentContext, traceId, name, spanKind, attributes, parentLinks); + + if (clock.nanoTime() < boostEndTimeNanos) { + result = + boostedFixedRateSampler.shouldSample( + parentContext, traceId, name, spanKind, attributes, parentLinks); + } else { + result = + fixedRateSampler.shouldSample( + parentContext, traceId, name, spanKind, attributes, parentLinks); + } if (result.getDecision() != SamplingDecision.DROP) { statistics.sampled.increment(); } return result; } + void countTrace() { + statistics.traces.increment(); + } + + void countAnomalyTrace(ReadableSpan span) { + statistics.anomalies.increment(); + + if (span.getSpanContext().isSampled()) { + statistics.anomaliesSampled.increment(); + } + } + @Nullable - SamplingStatisticsDocument snapshot(Date now) { + SamplingRuleStatisticsSnapshot snapshot(Date now) { if (clock.nanoTime() < nextSnapshotTimeNanos) { return null; } - return SamplingStatisticsDocument.newBuilder() - .setClientId(clientId) - .setRuleName(ruleName) - .setTimestamp(now) - // Resetting requests first ensures that sample / borrow rate are positive after the reset. - // Snapshotting is not concurrent so this ensures they are always positive. - .setRequestCount(statistics.requests.sumThenReset()) - .setSampledCount(statistics.sampled.sumThenReset()) - .setBorrowCount(statistics.borrowed.sumThenReset()) - .build(); + long totalCount = statistics.requests.sumThenReset(); + long sampledCount = statistics.sampled.sumThenReset(); + long borrowCount = statistics.borrowed.sumThenReset(); + long traceCount = statistics.traces.sumThenReset(); + long anomalyCount = statistics.anomalies.sumThenReset(); + long sampledAnomalyCount = statistics.anomaliesSampled.sumThenReset(); + SamplingStatisticsDocument samplingStatistics = + SamplingStatisticsDocument.newBuilder() + .setClientId(clientId) + .setRuleName(ruleName) + .setTimestamp(now) + // Resetting requests first ensures that sample / borrow rate are positive after the + // reset. + // Snapshotting is not concurrent so this ensures they are always positive. + .setRequestCount(totalCount) + .setSampledCount(sampledCount) + .setBorrowCount(borrowCount) + .build(); + SamplingBoostStatisticsDocument boostDoc = + SamplingBoostStatisticsDocument.newBuilder() + .setRuleName(ruleName) + .setServiceName(serviceName) + .setTimestamp(now) + .setTotalCount(traceCount) + .setAnomalyCount(anomalyCount) + .setSampledAnomalyCount(sampledAnomalyCount) + .build(); + return new SamplingRuleStatisticsSnapshot(samplingStatistics, boostDoc); } long getNextSnapshotTimeNanos() { return nextSnapshotTimeNanos; } - SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now) { + // currentNanoTime is passed in to ensure all uses of withTarget are used with the same baseline + // time reference + SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now, long currentNanoTime) { Sampler newFixedRateSampler = createFixedRate(target.getFixedRate()); Sampler newReservoirSampler = Sampler.alwaysOff(); - long newReservoirEndTimeNanos = clock.nanoTime(); + long newReservoirEndTimeNanos = currentNanoTime; // Not well documented but a quota should always come with a TTL if (target.getReservoirQuota() != null && target.getReservoirQuotaTtl() != null) { newReservoirSampler = createRateLimited(target.getReservoirQuota()); newReservoirEndTimeNanos = - clock.nanoTime() + currentNanoTime + Duration.between(now.toInstant(), target.getReservoirQuotaTtl().toInstant()) .toNanos(); } @@ -319,16 +394,36 @@ SamplingRuleApplier withTarget(SamplingTargetDocument target, Date now) { target.getIntervalSecs() != null ? TimeUnit.SECONDS.toNanos(target.getIntervalSecs()) : AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; - long newNextSnapshotTimeNanos = clock.nanoTime() + intervalNanos; + long newNextSnapshotTimeNanos = currentNanoTime + intervalNanos; + + double newBoostedFixedRate = fixedRate; + long newBoostEndTimeNanos = currentNanoTime; + if (target.getSamplingBoost() != null) { + SamplingBoost samplingBoostMap = target.getSamplingBoost(); + if (samplingBoostMap != null + && samplingBoostMap.getBoostRate() >= target.getFixedRate() + && samplingBoostMap.getBoostRateTtl() != null) { + newBoostedFixedRate = samplingBoostMap.getBoostRate(); + newBoostEndTimeNanos = + currentNanoTime + + Duration.between(now.toInstant(), samplingBoostMap.getBoostRateTtl().toInstant()) + .toNanos(); + } + } return new SamplingRuleApplier( clientId, ruleName, + serviceName, clock, newReservoirSampler, newReservoirEndTimeNanos, + fixedRate, newFixedRateSampler, /* borrowing= */ false, + newBoostedFixedRate, + newBoostEndTimeNanos, + hasBoost, attributeMatchers, urlPathMatcher, serviceNameMatcher, @@ -344,11 +439,16 @@ SamplingRuleApplier withNextSnapshotTimeNanos(long newNextSnapshotTimeNanos) { return new SamplingRuleApplier( clientId, ruleName, + serviceName, clock, reservoirSampler, reservoirEndTimeNanos, + fixedRate, fixedRateSampler, borrowing, + boostedFixedRate, + boostEndTimeNanos, + hasBoost, attributeMatchers, urlPathMatcher, serviceNameMatcher, @@ -364,6 +464,15 @@ String getRuleName() { return ruleName; } + // For testing + String getServiceName() { + return serviceName; + } + + boolean hasBoost() { + return hasBoost; + } + @Nullable private static String getArn(Attributes attributes, Resource resource) { String arn = resource.getAttributes().get(AWS_ECS_CONTAINER_ARN); @@ -515,5 +624,30 @@ private static class Statistics { final LongAdder requests = new LongAdder(); final LongAdder sampled = new LongAdder(); final LongAdder borrowed = new LongAdder(); + final LongAdder traces = new LongAdder(); + final LongAdder anomalies = new LongAdder(); + final LongAdder anomaliesSampled = new LongAdder(); + } + + static class SamplingRuleStatisticsSnapshot { + final SamplingStatisticsDocument statisticsDocument; + final SamplingBoostStatisticsDocument boostStatisticsDocument; + + // final SamplingBoostStatisticsDocument boostStatisticsDocument; + + SamplingRuleStatisticsSnapshot( + SamplingStatisticsDocument statisticsDocument, + SamplingBoostStatisticsDocument boostStatisticsDocument) { + this.statisticsDocument = statisticsDocument; + this.boostStatisticsDocument = boostStatisticsDocument; + } + + SamplingStatisticsDocument getStatisticsDocument() { + return statisticsDocument; + } + + SamplingBoostStatisticsDocument getBoostStatisticsDocument() { + return boostStatisticsDocument; + } } } diff --git a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java index 75977dc0f..9620ba2b5 100644 --- a/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java +++ b/aws-xray/src/main/java/io/opentelemetry/contrib/awsxray/XrayRulesSampler.java @@ -5,42 +5,79 @@ package io.opentelemetry.contrib.awsxray; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.ServiceAttributes.SERVICE_NAME; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; import java.util.Arrays; import java.util.Comparator; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import javax.annotation.Nullable; final class XrayRulesSampler implements Sampler { private static final Logger logger = Logger.getLogger(XrayRulesSampler.class.getName()); + public static final AttributeKey AWS_XRAY_SAMPLING_RULE = + AttributeKey.stringKey("aws.xray.sampling_rule"); + + // Used for generating operation + private static final String UNKNOWN_OPERATION = "UnknownOperation"; + private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); + private static final AttributeKey HTTP_TARGET = AttributeKey.stringKey("http.target"); + private static final AttributeKey HTTP_REQUEST_METHOD = + AttributeKey.stringKey("http.request.method"); + private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); + private final String clientId; private final Resource resource; private final Clock clock; private final Sampler fallbackSampler; private final SamplingRuleApplier[] ruleAppliers; + private final Map ruleToHashMap; + private final Map hashToRuleMap; + + private final boolean adaptiveSamplingRuleExists; + private final Cache traceUsageCache; + + @Nullable private AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig; + @Nullable private RateLimiter anomalyCaptureRateLimiter; XrayRulesSampler( String clientId, Resource resource, Clock clock, Sampler fallbackSampler, - List rules) { + List rules, + @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig) { this( clientId, resource, @@ -49,8 +86,19 @@ final class XrayRulesSampler implements Sampler { rules.stream() // Lower priority value takes precedence so normal ascending sort. .sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority)) - .map(rule -> new SamplingRuleApplier(clientId, rule, clock)) - .toArray(SamplingRuleApplier[]::new)); + .map( + rule -> + new SamplingRuleApplier( + clientId, rule, resource.getAttribute(SERVICE_NAME), clock)) + .toArray(SamplingRuleApplier[]::new), + createRuleHashMaps(rules), + rules.stream().anyMatch(r -> r.getSamplingRateBoost() != null), + adaptiveSamplingConfig, + Caffeine.newBuilder() + .maximumSize(100_000) + .ticker(clock::nanoTime) + .expireAfterWrite(Duration.ofMinutes(10)) + .build()); } private XrayRulesSampler( @@ -58,12 +106,36 @@ private XrayRulesSampler( Resource resource, Clock clock, Sampler fallbackSampler, - SamplingRuleApplier[] ruleAppliers) { + SamplingRuleApplier[] ruleAppliers, + Map ruleToHashMap, + boolean adaptiveSamplingRuleExists, + @Nullable AwsXrayAdaptiveSamplingConfig adaptiveSamplingConfig, + Cache traceUsageCache) { this.clientId = clientId; this.resource = resource; this.clock = clock; this.fallbackSampler = fallbackSampler; this.ruleAppliers = ruleAppliers; + this.ruleToHashMap = ruleToHashMap; + this.hashToRuleMap = new HashMap<>(); + for (Map.Entry entry : ruleToHashMap.entrySet()) { + this.hashToRuleMap.put(entry.getValue(), entry.getKey()); + } + this.adaptiveSamplingRuleExists = adaptiveSamplingRuleExists; + this.adaptiveSamplingConfig = adaptiveSamplingConfig; + this.traceUsageCache = traceUsageCache; + + // Initialize anomaly capture rate limiter + if (this.adaptiveSamplingConfig != null + && this.adaptiveSamplingConfig.getAnomalyCaptureLimit() == null) { + this.anomalyCaptureRateLimiter = new RateLimiter(1, 1, clock); + } else if (adaptiveSamplingConfig != null + && adaptiveSamplingConfig.getAnomalyCaptureLimit() != null) { + int anomalyTracesPerSecond = + adaptiveSamplingConfig.getAnomalyCaptureLimit().getAnomalyTracesPerSecond(); + this.anomalyCaptureRateLimiter = + new RateLimiter(anomalyTracesPerSecond, anomalyTracesPerSecond, clock); + } } @Override @@ -74,10 +146,36 @@ public SamplingResult shouldSample( SpanKind spanKind, Attributes attributes, List parentLinks) { + String upstreamMatchedRule = + Span.fromContext(parentContext) + .getSpanContext() + .getTraceState() + .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); for (SamplingRuleApplier applier : ruleAppliers) { if (applier.matches(attributes, resource)) { - return applier.shouldSample( - parentContext, traceId, name, spanKind, attributes, parentLinks); + SamplingResult result = + applier.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + + // If the trace state has a sampling rule reference, propagate it + // Otherwise, encode and propagate the matched sampling rule using AwsSamplingResult + String ruleToPropagate; + if (upstreamMatchedRule != null) { + ruleToPropagate = hashToRuleMap.getOrDefault(upstreamMatchedRule, applier.getRuleName()); + } else { + ruleToPropagate = applier.getRuleName(); + } + String hashedRule = ruleToHashMap.getOrDefault(ruleToPropagate, ruleToPropagate); + if (this.adaptiveSamplingConfig != null + && this.adaptiveSamplingConfig.getAnomalyCaptureLimit() != null) { + // If the span is capturable based on local SDK config, add sampling rule attribute + return AwsSamplingResult.create( + result.getDecision(), + result.getAttributes().toBuilder() + .put(AWS_XRAY_SAMPLING_RULE.getKey(), ruleToPropagate) + .build(), + hashedRule); + } + return AwsSamplingResult.create(result.getDecision(), result.getAttributes(), hashedRule); } } @@ -96,7 +194,184 @@ public String getDescription() { return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}"; } - List snapshot(Date now) { + void setAdaptiveSamplingConfig(AwsXrayAdaptiveSamplingConfig config) { + if (this.adaptiveSamplingConfig != null) { + throw new IllegalStateException("Programming bug - Adaptive sampling config is already set"); + } else if (config != null && this.adaptiveSamplingConfig == null) { + this.adaptiveSamplingConfig = config; + + // Initialize anomaly capture rate limiter if error capture limit is configured + if (config.getAnomalyCaptureLimit() != null) { + int anomalyTracesPerSecond = config.getAnomalyCaptureLimit().getAnomalyTracesPerSecond(); + this.anomalyCaptureRateLimiter = + new RateLimiter(anomalyTracesPerSecond, anomalyTracesPerSecond, clock); + } + } + } + + void adaptSampling(ReadableSpan span, SpanData spanData, Consumer spanBatcher) { + if (!adaptiveSamplingRuleExists && this.adaptiveSamplingConfig == null) { + return; + } + Long statusCode = spanData.getAttributes().get(HTTP_RESPONSE_STATUS_CODE); + + boolean shouldBoostSampling = false; + boolean shouldCaptureAnomalySpan = false; + + List anomalyConditions = + adaptiveSamplingConfig != null ? adaptiveSamplingConfig.getAnomalyConditions() : null; + // Empty list -> no conditions will apply and we will not do anything + if (anomalyConditions != null && !anomalyConditions.isEmpty()) { + String operation = spanData.getAttributes().get(AwsAttributeKeys.AWS_LOCAL_OPERATION); + if (operation == null) { + operation = generateIngressOperation(spanData); + } + for (AwsXrayAdaptiveSamplingConfig.AnomalyConditions condition : anomalyConditions) { + // Skip condition if it would only re-apply action already being taken + if ((shouldBoostSampling + && AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST.equals( + condition.getUsage())) + || (shouldCaptureAnomalySpan + && AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE.equals( + condition.getUsage()))) { + continue; + } + // Check if the operation matches any in the list or if operations list is null (match all) + List operations = condition.getOperations(); + if (!(operations == null || operations.isEmpty() || operations.contains(operation))) { + continue; + } + // Check if any anomalyConditions detect an anomaly either through error code or latency + boolean isAnomaly = false; + + String errorCodeRegex = condition.getErrorCodeRegex(); + if (statusCode != null && errorCodeRegex != null) { + isAnomaly = statusCode.toString().matches(errorCodeRegex); + } + + Long highLatencyMs = condition.getHighLatencyMs(); + if (highLatencyMs != null) { + isAnomaly = + (errorCodeRegex == null || isAnomaly) + && (span.getLatencyNanos() / 1_000_000.0) >= highLatencyMs; + } + + if (isAnomaly) { + AwsXrayAdaptiveSamplingConfig.UsageType usage = condition.getUsage(); + if (usage != null) { + switch (usage) { + case BOTH: + shouldBoostSampling = true; + shouldCaptureAnomalySpan = true; + break; + case SAMPLING_BOOST: + shouldBoostSampling = true; + break; + case ANOMALY_TRACE_CAPTURE: + shouldCaptureAnomalySpan = true; + break; + default: // do nothing + } + } else { + shouldBoostSampling = true; + shouldCaptureAnomalySpan = true; + } + } + if (shouldBoostSampling && shouldCaptureAnomalySpan) { + break; + } + } + } else if ((statusCode != null && statusCode > 499) + || (statusCode == null + && spanData.getStatus() != null + && StatusCode.ERROR.equals(spanData.getStatus().getStatusCode()))) { + shouldBoostSampling = true; + shouldCaptureAnomalySpan = true; + } + + String traceId = spanData.getTraceId(); + AwsXrayAdaptiveSamplingConfig.UsageType existingUsage = traceUsageCache.getIfPresent(traceId); + boolean isNewTrace = existingUsage == null; + + // Anomaly Capture + boolean isSpanCaptured = false; + if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForAnomalyTraceCapture(existingUsage) + || (shouldCaptureAnomalySpan + && !span.getSpanContext().isSampled() + && anomalyCaptureRateLimiter != null + && anomalyCaptureRateLimiter.trySpend(1))) { + spanBatcher.accept(span); + isSpanCaptured = true; + } + + // Sampling Boost + boolean isCountedAsAnomalyForBoost = false; + if (shouldBoostSampling || isNewTrace) { + String traceStateValue = + span.getSpanContext() + .getTraceState() + .get(AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY); + String ruleNameForBoostStats = + traceStateValue != null + ? hashToRuleMap.getOrDefault(traceStateValue, traceStateValue) + : traceStateValue; + SamplingRuleApplier ruleToReportTo = null; + SamplingRuleApplier matchedRule = null; + for (SamplingRuleApplier applier : ruleAppliers) { + // Rule propagated from when sampling decision was made, otherwise the matched rule + if (applier.getRuleName().equals(ruleNameForBoostStats)) { + ruleToReportTo = applier; + break; + } + if (applier.matches(spanData.getAttributes(), resource)) { + matchedRule = applier; + } + } + if (ruleToReportTo == null) { + if (matchedRule == null) { + logger.log( + Level.FINE, + "No sampling rule matched the request. This is a bug in either the OpenTelemetry SDK or X-Ray."); + } else { + ruleToReportTo = matchedRule; + } + } + if (shouldBoostSampling + && ruleToReportTo != null + && ruleToReportTo.hasBoost() + && !AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForBoost(existingUsage)) { + ruleToReportTo.countAnomalyTrace(span); + isCountedAsAnomalyForBoost = true; + } + if (isNewTrace && ruleToReportTo != null && ruleToReportTo.hasBoost()) { + ruleToReportTo.countTrace(); + } + } + + // Any interaction with a cache entry will reset the expiration timer of that entry + if (isSpanCaptured && isCountedAsAnomalyForBoost) { + this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + } else if (isSpanCaptured) { + if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForBoost(existingUsage)) { + this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + } else { + this.traceUsageCache.put( + traceId, AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE); + } + } else if (isCountedAsAnomalyForBoost) { + if (AwsXrayAdaptiveSamplingConfig.UsageType.isUsedForAnomalyTraceCapture(existingUsage)) { + this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + } else { + this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST); + } + } else if (existingUsage != null) { + this.traceUsageCache.put(traceId, existingUsage); + } else { + this.traceUsageCache.put(traceId, AwsXrayAdaptiveSamplingConfig.UsageType.NEITHER); + } + } + + List snapshot(Date now) { return Arrays.stream(ruleAppliers) .map(rule -> rule.snapshot(now)) .filter(Objects::nonNull) @@ -115,15 +390,16 @@ XrayRulesSampler withTargets( Map ruleTargets, Set requestedTargetRuleNames, Date now) { + long currentNanoTime = clock.nanoTime(); long defaultNextSnapshotTimeNanos = - clock.nanoTime() + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; + currentNanoTime + AwsXrayRemoteSampler.DEFAULT_TARGET_INTERVAL_NANOS; SamplingRuleApplier[] newAppliers = Arrays.stream(ruleAppliers) .map( rule -> { SamplingTargetDocument target = ruleTargets.get(rule.getRuleName()); if (target != null) { - return rule.withTarget(target, now); + return rule.withTarget(target, now, currentNanoTime); } if (requestedTargetRuleNames.contains(rule.getRuleName())) { // In practice X-Ray should return a target for any rule we requested but @@ -135,6 +411,92 @@ XrayRulesSampler withTargets( return rule; }) .toArray(SamplingRuleApplier[]::new); - return new XrayRulesSampler(clientId, resource, clock, fallbackSampler, newAppliers); + return new XrayRulesSampler( + clientId, + resource, + clock, + fallbackSampler, + newAppliers, + ruleToHashMap, + adaptiveSamplingRuleExists, + adaptiveSamplingConfig, + traceUsageCache); + } + + static boolean isKeyPresent(SpanData span, AttributeKey key) { + return span.getAttributes().get(key) != null; + } + + private static String generateIngressOperation(SpanData span) { + String operation = UNKNOWN_OPERATION; + if (isKeyPresent(span, URL_PATH) || isKeyPresent(span, HTTP_TARGET)) { + String httpTarget = + isKeyPresent(span, URL_PATH) + ? span.getAttributes().get(URL_PATH) + : span.getAttributes().get(HTTP_TARGET); + // get the first part from API path string as operation value + // the more levels/parts we get from API path the higher chance for getting high cardinality + // data + if (httpTarget != null) { + operation = extractApiPathValue(httpTarget); + if (isKeyPresent(span, HTTP_REQUEST_METHOD) || isKeyPresent(span, HTTP_METHOD)) { + String httpMethod = + isKeyPresent(span, HTTP_REQUEST_METHOD) + ? span.getAttributes().get(HTTP_REQUEST_METHOD) + : span.getAttributes().get(HTTP_METHOD); + if (httpMethod != null) { + operation = httpMethod + " " + operation; + } + } + } + } + return operation; + } + + private static String extractApiPathValue(String httpTarget) { + if (httpTarget == null || httpTarget.isEmpty()) { + return "/"; + } + String[] paths = httpTarget.split("/"); + if (paths.length > 1) { + return "/" + paths[1]; + } + return "/"; + } + + private static Map createRuleHashMaps( + List rules) { + Map ruleToHashMap = new HashMap<>(); + for (GetSamplingRulesResponse.SamplingRule rule : rules) { + String ruleName = rule.getRuleName(); + if (ruleName != null) { + ruleToHashMap.put(ruleName, hashRuleName(ruleName)); + } + } + return ruleToHashMap; + } + + static String hashRuleName(String ruleName) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(ruleName.getBytes(StandardCharsets.UTF_8)); + StringBuilder hexString = new StringBuilder(); + for (int i = 0; i < Math.min(hash.length, 8); i++) { + String hex = Integer.toHexString(0xff & hash[i]); + if (hex.length() == 1) { + hexString.append('0'); + } + hexString.append(hex); + } + return hexString.toString(); + } catch (NoSuchAlgorithmException e) { + return ruleName; + } + } + + // For testing + Cache getTraceUsageCache() { + traceUsageCache.cleanUp(); + return traceUsageCache; } } diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java index d45e00ad2..d8f2afcae 100644 --- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java +++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/AwsXrayRemoteSamplerTest.java @@ -7,7 +7,10 @@ import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import com.google.common.io.ByteStreams; import com.linecorp.armeria.common.HttpResponse; @@ -21,6 +24,9 @@ import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; import java.io.IOException; @@ -187,6 +193,31 @@ void parentBasedXraySamplerAfterDefaultSampler() { } } + void setAndResetSpanExporter() { + try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { + // Setting span exporter should only work once + sampler.setSpanExporter(mock(SpanExporter.class)); + assertThrows( + IllegalStateException.class, () -> sampler.setSpanExporter(mock(SpanExporter.class))); + } + } + + @Test + void adaptSamplingWithoutSpanExporter() { + assertThrows( + IllegalStateException.class, + () -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))); + } + + @Test + void adaptSamplingWithSpanExporter() { + try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { + sampler.setSpanExporter(mock(SpanExporter.class)); + assertThatCode(() -> sampler.adaptSampling(mock(ReadableSpan.class), mock(SpanData.class))) + .doesNotThrowAnyException(); + } + } + // https://github.com/open-telemetry/opentelemetry-java-contrib/issues/376 @Test void testJitterTruncation() { @@ -205,6 +236,16 @@ void testJitterTruncation() { } } + @Test + void setAdaptiveSamplingConfig() { + try (AwsXrayRemoteSampler sampler = AwsXrayRemoteSampler.newBuilder(Resource.empty()).build()) { + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); + sampler.setAdaptiveSamplingConfig(config); + assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); + } + } + private static SamplingDecision doSample(Sampler sampler, String name) { return sampler .shouldSample( diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java index 920a5ffd4..dcc7118a2 100644 --- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java +++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/SamplingRuleApplierTest.java @@ -15,18 +15,25 @@ import static io.opentelemetry.semconv.incubating.NetIncubatingAttributes.NET_HOST_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingBoost; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.time.TestClock; +import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; import io.opentelemetry.sdk.trace.samplers.SamplingResult; import io.opentelemetry.semconv.HttpAttributes; @@ -37,6 +44,7 @@ import java.io.UncheckedIOException; import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.Date; import java.util.concurrent.TimeUnit; @@ -50,6 +58,7 @@ class SamplingRuleApplierTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String CLIENT_ID = "test-client-id"; + private static final String TEST_SERVICE_NAME = "test-service-name"; @Nested @SuppressWarnings("ClassCanBeStatic") @@ -57,7 +66,10 @@ class ExactMatch { private final SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-exactmatch.json"), Clock.getDefault()); + CLIENT_ID, + readSamplingRule("/sampling-rule-exactmatch.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); private final Resource resource = Resource.builder() @@ -91,7 +103,8 @@ void fixedRateAlwaysSample() { .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); Date now = new Date(); - GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); + GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = + applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -100,7 +113,7 @@ void fixedRateAlwaysSample() { assertThat(statistics.getBorrowCount()).isEqualTo(0); // Reset - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getRequestCount()).isEqualTo(0); assertThat(statistics.getSampledCount()).isEqualTo(0); assertThat(statistics.getBorrowCount()).isEqualTo(0); @@ -108,7 +121,7 @@ void fixedRateAlwaysSample() { doSample(applier); doSample(applier); now = new Date(); - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -283,7 +296,10 @@ class WildcardMatch { private final SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-wildcards.json"), Clock.getDefault()); + CLIENT_ID, + readSamplingRule("/sampling-rule-wildcards.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); private final Resource resource = Resource.builder() @@ -316,7 +332,8 @@ void fixedRateNeverSample() { assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); Date now = new Date(); - GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); + GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = + applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -325,7 +342,7 @@ void fixedRateNeverSample() { assertThat(statistics.getBorrowCount()).isEqualTo(0); // Reset - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getRequestCount()).isEqualTo(0); assertThat(statistics.getSampledCount()).isEqualTo(0); assertThat(statistics.getBorrowCount()).isEqualTo(0); @@ -333,7 +350,7 @@ void fixedRateNeverSample() { doSample(applier); doSample(applier); now = new Date(); - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -626,7 +643,10 @@ class AwsLambdaTest { private final SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-awslambda.json"), Clock.getDefault()); + CLIENT_ID, + readSamplingRule("/sampling-rule-awslambda.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); private final Resource resource = Resource.builder() @@ -677,7 +697,10 @@ void notLambdaNotMatches() { void borrowing() { SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), Clock.getDefault()); + CLIENT_ID, + readSamplingRule("/sampling-rule-reservoir.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); // Borrow assertThat(doSample(applier)) @@ -688,7 +711,8 @@ void borrowing() { assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); Date now = new Date(); - GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = applier.snapshot(now); + GetSamplingTargetsRequest.SamplingStatisticsDocument statistics = + applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -697,7 +721,7 @@ void borrowing() { assertThat(statistics.getBorrowCount()).isEqualTo(1); // Reset - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getRequestCount()).isEqualTo(0); assertThat(statistics.getSampledCount()).isEqualTo(0); assertThat(statistics.getBorrowCount()).isEqualTo(0); @@ -713,7 +737,7 @@ void borrowing() { }); now = new Date(); - statistics = applier.snapshot(now); + statistics = applier.snapshot(now).getStatisticsDocument(); assertThat(statistics.getClientId()).isEqualTo(CLIENT_ID); assertThat(statistics.getRuleName()).isEqualTo("Test"); assertThat(statistics.getTimestamp()).isEqualTo(now); @@ -727,7 +751,7 @@ void ruleWithTarget() { TestClock clock = TestClock.create(); SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); + CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); // No target yet, borrows from reservoir every second. assertThat(doSample(applier)) .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); @@ -746,8 +770,8 @@ void ruleWithTarget() { // Got a target! SamplingTargetDocument target = - SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), "test"); - applier = applier.withTarget(target, Date.from(now)); + SamplingTargetDocument.create(0.0, 5, 2, Date.from(now.plusSeconds(10)), null, "test"); + applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); // Statistics not expired yet assertThat(applier.snapshot(Date.from(now))).isNull(); @@ -786,7 +810,7 @@ void ruleWithTargetWithoutQuota() { TestClock clock = TestClock.create(); SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); + CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); // No target yet, borrows from reservoir every second. assertThat(doSample(applier)) .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); @@ -804,8 +828,8 @@ void ruleWithTargetWithoutQuota() { assertThat(applier.snapshot(Date.from(now.plus(Duration.ofMinutes(30))))).isNotNull(); // Got a target! - SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, "test"); - applier = applier.withTarget(target, Date.from(now)); + SamplingTargetDocument target = SamplingTargetDocument.create(0.0, 5, null, null, null, "test"); + applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); // No reservoir, always use fixed rate (drop) assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); @@ -815,12 +839,105 @@ void ruleWithTargetWithoutQuota() { assertThat(applier.snapshot(Date.from(now))).isNotNull(); } + @Test + void ruleWithBoost() { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( + CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); + // No reservoir, always use fixed rate (drop) + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + + // Got a target! + // Boost raises sampling rate to 100% for 20 seconds + SamplingTargetDocument target = + SamplingTargetDocument.create( + 0.0, + 5, + null, + null, + SamplingBoost.create(1.0, Date.from(now.plus(20, ChronoUnit.SECONDS))), + "test"); + applier = applier.withTarget(target, Date.from(now), clock.nanoTime()); + + // We should start sampling at this point + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + // After waiting 10 seconds, we should still be sampling + clock.advance(Duration.ofSeconds(10)); + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + assertThat(doSample(applier)) + .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + // After 30 seconds, we should stop sampling + clock.advance(Duration.ofSeconds(20)); + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + } + + @Test + void countTrace() { + TestClock clock = TestClock.create(); + SamplingRuleApplier applier = + new SamplingRuleApplier( + CLIENT_ID, readSamplingRule("/sampling-rule-boost.json"), TEST_SERVICE_NAME, clock); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + + SamplingRuleApplier.SamplingRuleStatisticsSnapshot snapshot = applier.snapshot(Date.from(now)); + assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); + + applier.countTrace(); + applier.countTrace(); + applier.countTrace(); + + snapshot = applier.snapshot(Date.from(now)); + assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); + assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); + + // Snapshotting again should've reset the statistics + snapshot = applier.snapshot(Date.from(now)); + assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); + assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); + + // Decision to separate by trace ID is made in XrayRulesSampler class, so we can ignore + // trace/span ID in span context here + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + // Mock sampling the first two traces + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getSampled(), TraceState.getDefault())); + applier.countTrace(); + applier.countAnomalyTrace(readableSpanMock); + applier.countTrace(); + applier.countAnomalyTrace(readableSpanMock); + + // Mock not sampling the last trace + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + applier.countTrace(); + applier.countAnomalyTrace(readableSpanMock); + + snapshot = applier.snapshot(Date.from(now)); + assertThat(snapshot.getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); + assertThat(snapshot.getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); + assertThat(snapshot.getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(2); + } + @Test void withNextSnapshotTime() { TestClock clock = TestClock.create(); SamplingRuleApplier applier = new SamplingRuleApplier( - CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), clock); + CLIENT_ID, readSamplingRule("/sampling-rule-reservoir.json"), TEST_SERVICE_NAME, clock); Instant now = Instant.ofEpochSecond(0, clock.now()); assertThat(applier.snapshot(Date.from(now))).isNotNull(); @@ -839,6 +956,71 @@ void withNextSnapshotTime() { assertThat(doSample(applier)).isEqualTo(SamplingResult.create(SamplingDecision.DROP)); } + @Test + void hasBoostMethod() { + SamplingRuleApplier applierWithBoost = + new SamplingRuleApplier( + CLIENT_ID, + readSamplingRule("/sampling-rule-boost.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); + assertThat(applierWithBoost.hasBoost()).isTrue(); + + SamplingRuleApplier applierWithoutBoost = + new SamplingRuleApplier( + CLIENT_ID, + readSamplingRule("/sampling-rule-exactmatch.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); + assertThat(applierWithoutBoost.hasBoost()).isFalse(); + } + + @Test + void getServiceNameMethod() { + SamplingRuleApplier applier = + new SamplingRuleApplier( + CLIENT_ID, + readSamplingRule("/sampling-rule-exactmatch.json"), + TEST_SERVICE_NAME, + Clock.getDefault()); + assertThat(applier.getServiceName()).isEqualTo(TEST_SERVICE_NAME); + } + + @Test + void nullRuleName() { + GetSamplingRulesResponse.SamplingRule ruleWithNullName = + GetSamplingRulesResponse.SamplingRule.create( + Collections.emptyMap(), + 1.0, + "*", + "*", + 1, + 0, + "*", + null, // null rule name + null, + "*", + "*", + "*", + 1, + null); + + SamplingRuleApplier applier = + new SamplingRuleApplier(CLIENT_ID, ruleWithNullName, TEST_SERVICE_NAME, Clock.getDefault()); + assertThat(applier.getRuleName()).isEqualTo("default"); + } + + @Test + void nullServiceName() { + SamplingRuleApplier applier = + new SamplingRuleApplier( + CLIENT_ID, + readSamplingRule("/sampling-rule-exactmatch.json"), + null, // null service name + Clock.getDefault()); + assertThat(applier.getServiceName()).isEqualTo("default"); + } + private static SamplingResult doSample(SamplingRuleApplier applier) { return applier.shouldSample( Context.current(), diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java index 1ca8df347..72ec524b1 100644 --- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java +++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XrayRulesSamplerTest.java @@ -5,17 +5,28 @@ package io.opentelemetry.contrib.awsxray; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; +import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRateBoost; import io.opentelemetry.contrib.awsxray.GetSamplingRulesResponse.SamplingRule; import io.opentelemetry.contrib.awsxray.GetSamplingTargetsResponse.SamplingTargetDocument; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.time.TestClock; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingDecision; import io.opentelemetry.sdk.trace.samplers.SamplingResult; @@ -25,14 +36,20 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.Test; class XrayRulesSamplerTest { + private static final AttributeKey URL_PATH = AttributeKey.stringKey("url.path"); + private static final AttributeKey HTTP_METHOD = AttributeKey.stringKey("http.method"); + @Test void updateTargets() { SamplingRule rule1 = @@ -49,7 +66,8 @@ void updateTargets() { "*", "*", "*", - 1); + 1, + null); SamplingRule rule2 = SamplingRule.create( Collections.singletonMap("test", "dog-service"), @@ -64,7 +82,8 @@ void updateTargets() { "*", "*", "*", - 1); + 1, + null); SamplingRule rule3 = SamplingRule.create( Collections.singletonMap("test", "*-service"), @@ -79,7 +98,8 @@ void updateTargets() { "*", "*", "*", - 1); + 1, + null); SamplingRule rule4 = SamplingRule.create( Collections.emptyMap(), @@ -94,7 +114,8 @@ void updateTargets() { "*", "*", "*", - 1); + 1, + null); TestClock clock = TestClock.create(); XrayRulesSampler sampler = @@ -103,22 +124,58 @@ void updateTargets() { Resource.getDefault(), clock, Sampler.alwaysOn(), - Arrays.asList(rule1, rule4, rule3, rule2)); + Arrays.asList(rule1, rule4, rule3, rule2), + null); assertThat(doSample(sampler, "cat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("cat-rule"))); assertThat(doSample(sampler, "cat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("cat-rule"))); assertThat(doSample(sampler, "dog-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("dog-rule"))); assertThat(doSample(sampler, "dog-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("dog-rule"))); assertThat(doSample(sampler, "bat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("bat-rule"))); assertThat(doSample(sampler, "bat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("bat-rule"))); assertThat(doSample(sampler, "unknown")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("default-rule"))); Instant now = Instant.ofEpochSecond(0, clock.now()); assertThat(sampler.snapshot(Date.from(now))).hasSize(4); @@ -128,10 +185,10 @@ void updateTargets() { assertThat(sampler.snapshot(Date.from(now))).hasSize(4); SamplingTargetDocument catTarget = - SamplingTargetDocument.create(0.0, 10, null, null, "cat-rule"); + SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); SamplingTargetDocument batTarget = - SamplingTargetDocument.create(0.0, 5, null, null, "bat-rule"); + SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); clock.advance(Duration.ofSeconds(10)); now = Instant.ofEpochSecond(0, clock.now()); @@ -145,16 +202,41 @@ void updateTargets() { .collect(Collectors.toSet()), Date.from(now)); assertThat(doSample(sampler, "dog-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.empty(), + XrayRulesSampler.hashRuleName("dog-rule"))); assertThat(doSample(sampler, "dog-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("dog-rule"))); assertThat(doSample(sampler, "unknown")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("default-rule"))); // Targets overridden to always drop. assertThat(doSample(sampler, "cat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("cat-rule"))); assertThat(doSample(sampler, "bat-service")) - .isEqualTo(SamplingResult.create(SamplingDecision.DROP)); + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.empty(), + XrayRulesSampler.hashRuleName("bat-rule"))); // Minimum is batTarget, 5s from now assertThat(sampler.nextTargetFetchTimeNanos()) @@ -169,6 +251,867 @@ void updateTargets() { assertThat(sampler.snapshot(Date.from(now))).hasSize(4); } + @Test + void updateTargetsWithLocalAdaptiveSamplingConfig() { + SamplingRule rule1 = + SamplingRule.create( + Collections.singletonMap("test", "cat-service"), + 1.0, + "*", + "*", + 1, + 1, + "*", + "*", + "cat-rule", + "*", + "*", + "*", + 1, + null); + SamplingRule rule2 = + SamplingRule.create( + Collections.singletonMap("test", "dog-service"), + 0.0, + "*", + "*", + 2, + 1, + "*", + "*", + "dog-rule", + "*", + "*", + "*", + 1, + null); + SamplingRule rule3 = + SamplingRule.create( + Collections.singletonMap("test", "*-service"), + 1.0, + "*", + "*", + 3, + 1, + "*", + "*", + "bat-rule", + "*", + "*", + "*", + 1, + null); + SamplingRule rule4 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 4, + 0, + "*", + "*", + "default-rule", + "*", + "*", + "*", + 1, + null); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(2) + .build()) + .build(); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1, rule4, rule3, rule2), + config); + + assertThat(doSample(sampler, "cat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") + .build(), + XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "cat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") + .build(), + XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "dog-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") + .build(), + XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "dog-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") + .build(), + XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "bat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") + .build(), + XrayRulesSampler.hashRuleName("bat-rule"))); + assertThat(doSample(sampler, "bat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") + .build(), + XrayRulesSampler.hashRuleName("bat-rule"))); + assertThat(doSample(sampler, "unknown")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") + .build(), + XrayRulesSampler.hashRuleName("default-rule"))); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + assertThat(sampler.nextTargetFetchTimeNanos()).isEqualTo(clock.nanoTime()); + clock.advance(Duration.ofSeconds(10)); + now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + + SamplingTargetDocument catTarget = + SamplingTargetDocument.create(0.0, 10, null, null, null, "cat-rule"); + + SamplingTargetDocument batTarget = + SamplingTargetDocument.create(0.0, 5, null, null, null, "bat-rule"); + + clock.advance(Duration.ofSeconds(10)); + now = Instant.ofEpochSecond(0, clock.now()); + Map targets = new HashMap<>(); + targets.put("cat-rule", catTarget); + targets.put("bat-rule", batTarget); + sampler = + sampler.withTargets( + targets, + Stream.of("cat-rule", "bat-rule", "dog-rule", "default-rule") + .collect(Collectors.toSet()), + Date.from(now)); + assertThat(doSample(sampler, "dog-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.RECORD_AND_SAMPLE, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") + .build(), + XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "dog-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "dog-rule") + .build(), + XrayRulesSampler.hashRuleName("dog-rule"))); + assertThat(doSample(sampler, "unknown")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "default-rule") + .build(), + XrayRulesSampler.hashRuleName("default-rule"))); + // Targets overridden to always drop. + assertThat(doSample(sampler, "cat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "cat-rule") + .build(), + XrayRulesSampler.hashRuleName("cat-rule"))); + assertThat(doSample(sampler, "bat-service")) + .usingRecursiveComparison() + .isEqualTo( + AwsSamplingResult.create( + SamplingDecision.DROP, + Attributes.builder() + .put(XrayRulesSampler.AWS_XRAY_SAMPLING_RULE, "bat-rule") + .build(), + XrayRulesSampler.hashRuleName("bat-rule"))); + + // Minimum is batTarget, 5s from now + assertThat(sampler.nextTargetFetchTimeNanos()) + .isEqualTo(clock.nanoTime() + TimeUnit.SECONDS.toNanos(5)); + + assertThat(sampler.snapshot(Date.from(now))).isEmpty(); + clock.advance(Duration.ofSeconds(5)); + now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(1); + clock.advance(Duration.ofSeconds(5)); + now = Instant.ofEpochSecond(0, clock.now()); + assertThat(sampler.snapshot(Date.from(now))).hasSize(4); + } + + @Test + void noAdaptiveSamplingUsesNoSpace() { + SamplingRule rule1 = + SamplingRule.create( + Collections.singletonMap("test", "cat-service"), + 1.0, + "*", + "*", + 1, + 1, + "*", + "*", + "cat-rule", + "*", + "*", + "*", + 1, + null); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + null); + + LongAdder exportCounter = new LongAdder(); + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + SpanData spanDataMock = mock(SpanData.class); + Consumer stubbedConsumer = x -> exportCounter.increment(); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(0); + } + + @Test + void recordErrors() { + SamplingRule rule1 = + SamplingRule.create( + Collections.singletonMap("test", "cat-service"), + 1.0, + "*", + "*", + 1, + 1, + "*", + "*", + "cat-rule", + "*", + "*", + "*", + 1, + null); + SamplingRule rule2 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 4, + 0, + "*", + "*", + "default-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(2) + .build()) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setErrorCodeRegex("^500$") + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) + .build())) + .build(); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1, rule2), + config); + + Instant now = Instant.ofEpochSecond(0, clock.now()); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.increment(); + + // First span should be captured, second should be rate limited + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + // Only first span captured due to rate limiting + assertThat(exportCounter.sumThenReset()).isEqualTo(2L); + + List snapshot = + sampler.snapshot(Date.from(now)); + + // Rules are ordered by priority, so cat-rule is first + assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); + assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); + + assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); + assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(3); + assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(3); + + assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); + + // Mock trace coming from upstream service where it was sampled by cat-rule + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID4", + "SPAN_ID", + TraceFlags.getDefault(), + TraceState.builder() + .put( + AwsSamplingResult.AWS_XRAY_SAMPLING_RULE_TRACE_STATE_KEY, + XrayRulesSampler.hashRuleName("cat-rule")) + .build())); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + + // Ensure snapshot shows correctly saved statistics + snapshot = sampler.snapshot(Date.from(now)); + // cat-rule has no boost config and therefore records no statistics + assertThat(snapshot.get(0).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); + assertThat(snapshot.get(0).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); + assertThat(snapshot.get(0).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); + assertThat(snapshot.get(1).getBoostStatisticsDocument().getTotalCount()).isEqualTo(0); + assertThat(snapshot.get(1).getBoostStatisticsDocument().getAnomalyCount()).isEqualTo(0); + assertThat(snapshot.get(1).getBoostStatisticsDocument().getSampledAnomalyCount()).isEqualTo(0); + + // Assert the trace ID cache is filled with appropriate data and is cleared after TTL passes + assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(4); + clock.advance(Duration.ofMinutes(100)); + assertThat(sampler.getTraceUsageCache().asMap().size()).isEqualTo(0); + } + + @Test + void setAdaptiveSamplingConfigTwice() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 1.0, + "*", + "*", + 1, + 1, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + null); + + TestClock clock = TestClock.create(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + null); + + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder().setVersion(1.0).build(); + sampler.setAdaptiveSamplingConfig(config); + assertThrows(IllegalStateException.class, () -> sampler.setAdaptiveSamplingConfig(config)); + } + + @Test + void captureErrorBasedOnErrorCodeRegex() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 1, + 0, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + + TestClock clock = TestClock.create(); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(2) + .build()) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setErrorCodeRegex("^456$") + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH) + .build())) + .build(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + config); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); + when(readableSpanMock.getLatencyNanos()).thenReturn(1L); + + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); + + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.increment(); + + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sum()).isEqualTo(2L); + } + + @Test + void captureErrorBasedOnHighLatency() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 1, + 0, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + + TestClock clock = TestClock.create(); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(2) + .build()) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setHighLatencyMs(100L) + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) + .build())) + .build(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + config); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); + when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms + + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); + + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.add(1); + + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sum()).isEqualTo(2L); + } + + @Test + void captureErrorBasedOnErroCodeAndLatency() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 1, + 0, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + + TestClock clock = TestClock.create(); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(2) + .build()) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setErrorCodeRegex("^456$") + .setHighLatencyMs(100L) + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) + .build())) + .build(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + config); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + when(readableSpanMock.getAttribute(any())).thenReturn("test-operation"); + when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms + + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); + + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.add(1); + + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sum()).isEqualTo(0L); + + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); + when(readableSpanMock.getLatencyNanos()).thenReturn(1L); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sum()).isEqualTo(0L); + + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(456L); + when(readableSpanMock.getLatencyNanos()).thenReturn(300_000_000L); // 300 ms + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID5"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sum()).isEqualTo(2L); + } + + @Test + void recordAndCaptureErrorBasedOnSeparateConditions() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 1, + 0, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + + TestClock clock = TestClock.create(); + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyCaptureLimit( + AwsXrayAdaptiveSamplingConfig.AnomalyCaptureLimit.builder() + .setAnomalyTracesPerSecond(10) + .build()) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setErrorCodeRegex("^5\\d\\d$") + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST) + .build(), + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setErrorCodeRegex("^4\\d\\d$") + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) + .build())) + .build(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + config); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID"); + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.add(1); + + // Boost condition triggered - count new trace + count anomaly + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(511L); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) + .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.SAMPLING_BOOST); + assertThat(exportCounter.sumThenReset()).isEqualTo(0L); + + // Anomaly capture triggered - capture and update cache value + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(411L); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) + .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + + // Boost condition triggered - capture span even though anomaly capture not included + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(511L); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) + .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + + // Non-anomaly span - should still be captured since trace is anomalous overall + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(200L); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(sampler.getTraceUsageCache().getIfPresent("TRACE_ID")) + .isEqualTo(AwsXrayAdaptiveSamplingConfig.UsageType.BOTH); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + } + + @Test + void operationFilteringInAdaptSampling() { + SamplingRule rule1 = + SamplingRule.create( + Collections.emptyMap(), + 0.0, + "*", + "*", + 1, + 0, + "*", + "*", + "test-rule", + "*", + "*", + "*", + 1, + SamplingRateBoost.create(1, 300)); + + TestClock clock = TestClock.create(); + // Error span capture should default to 1/s + AwsXrayAdaptiveSamplingConfig config = + AwsXrayAdaptiveSamplingConfig.builder() + .setVersion(1.0) + .setAnomalyConditions( + Arrays.asList( + AwsXrayAdaptiveSamplingConfig.AnomalyConditions.builder() + .setOperations(Arrays.asList("GET /api1", "GET /api2")) + .setErrorCodeRegex("^500$") + .setUsage(AwsXrayAdaptiveSamplingConfig.UsageType.ANOMALY_TRACE_CAPTURE) + .build())) + .build(); + XrayRulesSampler sampler = + new XrayRulesSampler( + "CLIENT_ID", + Resource.getDefault(), + clock, + Sampler.alwaysOn(), + Arrays.asList(rule1), + config); + + ReadableSpan readableSpanMock = mock(ReadableSpan.class); + when(readableSpanMock.getSpanContext()) + .thenReturn( + SpanContext.create( + "TRACE_ID", "SPAN_ID", TraceFlags.getDefault(), TraceState.getDefault())); + when(readableSpanMock.getLatencyNanos()).thenReturn(1L); + + SpanData spanDataMock = mock(SpanData.class); + Attributes attributesMock = mock(Attributes.class); + when(spanDataMock.getAttributes()).thenReturn(attributesMock); + when(attributesMock.get(HTTP_RESPONSE_STATUS_CODE)).thenReturn(500L); + + LongAdder exportCounter = new LongAdder(); + Consumer stubbedConsumer = x -> exportCounter.increment(); + + // Test matching operations + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); + when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + + clock.advance(Duration.ofSeconds(5)); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + when(attributesMock.get(URL_PATH)).thenReturn("/api2"); + when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(2L); + + // Not enough time elapsed, error rate limit was hit + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID3"); + when(attributesMock.get(URL_PATH)).thenReturn("/api2"); + when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(0L); + + // Test non-matching operation + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID4"); + when(attributesMock.get(URL_PATH)).thenReturn("/api1/ext"); + when(attributesMock.get(HTTP_METHOD)).thenReturn("POST"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID5"); + when(attributesMock.get(URL_PATH)).thenReturn("/non-matching"); + when(attributesMock.get(HTTP_METHOD)).thenReturn("GET"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(0L); + + // Test aws.local.operation takes priority + clock.advance(Duration.ofSeconds(5)); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID6"); + when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /api1"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + + // Test sending previously matched traceIDs gets captured + clock.advance(Duration.ofSeconds(5)); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID1"); + when(attributesMock.get(AwsAttributeKeys.AWS_LOCAL_OPERATION)).thenReturn("GET /non-matching"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + when(spanDataMock.getTraceId()).thenReturn("TRACE_ID2"); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(2L); + + // Test sending previously matched traceIDs gets captured as long as trace is active + clock.advance(Duration.ofSeconds(45)); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + clock.advance(Duration.ofSeconds(45)); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + clock.advance(Duration.ofSeconds(45)); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(1L); + + // Test sending non-matching trace after expire-time elapses + clock.advance(Duration.ofMinutes(100)); + sampler.adaptSampling(readableSpanMock, spanDataMock, stubbedConsumer); + assertThat(exportCounter.sumThenReset()).isEqualTo(0L); + } + private static SamplingResult doSample(Sampler sampler, String name) { return sampler.shouldSample( Context.current(), diff --git a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java index 283e3b3c9..cf0cb0728 100644 --- a/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java +++ b/aws-xray/src/test/java/io/opentelemetry/contrib/awsxray/XraySamplerClientTest.java @@ -126,7 +126,8 @@ void getSamplingTargets() throws Exception { .setRequestCount(10500) .setSampledCount(31) .setBorrowCount(0) - .build())); + .build()), + Collections.emptyList()); GetSamplingTargetsResponse response = client.getSamplingTargets(samplingTargetsRequest); AggregatedHttpRequest request = server.takeRequest().request(); @@ -174,7 +175,8 @@ void getSamplingTargets_malformed() { assertThatThrownBy( () -> client.getSamplingTargets( - GetSamplingTargetsRequest.create(Collections.emptyList()))) + GetSamplingTargetsRequest.create( + Collections.emptyList(), Collections.emptyList()))) .isInstanceOf(UncheckedIOException.class) .hasMessage("Failed to deserialize response."); } diff --git a/aws-xray/src/test/resources/sampling-rule-boost.json b/aws-xray/src/test/resources/sampling-rule-boost.json new file mode 100644 index 000000000..32752d5e5 --- /dev/null +++ b/aws-xray/src/test/resources/sampling-rule-boost.json @@ -0,0 +1,22 @@ +{ + "RuleName": "Test", + "RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test", + "ResourceARN": "arn:aws:xray:us-east-1:595986152929:my-service", + "Priority": 1, + "FixedRate": 0.0, + "ReservoirSize": 0, + "ServiceName": "*", + "ServiceType": "*", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "SamplingRateBoost": { + "MaxRate": 0.2, + "CooldownWindowMinutes": 300 + }, + "Attributes": { + "animal": "cat", + "speed": "0" + } +} diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index 0a883cf73..89baa6721 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -63,6 +63,10 @@ tasks.named("shadowJar") { mustRunAfter("jar") } +tasks.withType().configureEach { + dependsOn("shadowJar") +} + // The javadoc from wire's generated classes has errors that make the task that generates the "javadoc" artifact to fail. This // makes the javadoc task to ignore those generated classes. tasks.withType(Javadoc::class.java) {