Skip to content

Commit

Permalink
Merge pull request #6086 from DataDog/jpbempel/fix-condition-sampling
Browse files Browse the repository at this point in the history
Fix sampling with probe condition
  • Loading branch information
jpbempel authored Oct 26, 2023
2 parents 1d3d236 + 27e2e9b commit 703c18d
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public Status evaluate(
this.thisClassName = thisClassName;
boolean shouldEvaluate = resolveEvaluateAt(probeImplementation, methodLocation);
if (shouldEvaluate) {
probeImplementation.evaluate(this, status);
probeImplementation.evaluate(this, status, methodLocation);
}
return status;
}
Expand All @@ -340,11 +340,7 @@ private static boolean resolveEvaluateAt(
// line probe, no evaluation of probe's evaluateAt
return true;
}
MethodLocation localEvaluateAt = probeImplementation.getEvaluateAt();
if (methodLocation == MethodLocation.ENTRY) {
return localEvaluateAt == MethodLocation.DEFAULT || localEvaluateAt == MethodLocation.ENTRY;
}
return localEvaluateAt == methodLocation;
return MethodLocation.isSame(methodLocation, probeImplementation.getEvaluateAt());
}

public Status getStatus(String probeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,9 @@ public static DebuggerSpan createSpan(String operationName, String[] tags) {
*
* @return true if can proceed to capture data
*/
public static boolean isReadyToCapture(String... probeIds) {
// TODO provide overloaded version without string array
public static boolean isReadyToCapture(Class<?> callingClass, String... probeIds) {
try {
if (probeIds == null || probeIds.length == 0) {
return false;
}
boolean result = false;
for (String probeId : probeIds) {
// if all probes are rate limited, we don't capture
result |= ProbeRateLimiter.tryProbe(probeId);
}
result = result && checkAndSetInProbe();
return result;
return checkAndSetInProbe();
} catch (Exception ex) {
LOGGER.debug("Error in isReadyToCapture: ", ex);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@
public enum MethodLocation {
DEFAULT,
ENTRY,
EXIT
EXIT;

public static boolean isSame(MethodLocation methodLocation, MethodLocation evaluateAt) {
if (methodLocation == MethodLocation.ENTRY) {
return evaluateAt == MethodLocation.DEFAULT || evaluateAt == MethodLocation.ENTRY;
}
return methodLocation == evaluateAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ public interface ProbeImplementation {

String getStrTags();

void evaluate(CapturedContext context, CapturedContext.Status status);
void evaluate(
CapturedContext context, CapturedContext.Status status, MethodLocation methodLocation);

void commit(
CapturedContext entryContext,
Expand Down Expand Up @@ -84,7 +85,8 @@ public String getStrTags() {
}

@Override
public void evaluate(CapturedContext context, CapturedContext.Status status) {}
public void evaluate(
CapturedContext context, CapturedContext.Status status, MethodLocation methodLocation) {}

@Override
public void commit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.DoubleFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,6 +24,7 @@ public class ProbeRateLimiter {
new ConcurrentHashMap<>();
private static Sampler GLOBAL_SNAPSHOT_SAMPLER = createSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
private static Sampler GLOBAL_LOG_SAMPLER = createSampler(DEFAULT_GLOBAL_LOG_RATE);
private static DoubleFunction<Sampler> samplerSupplier = ProbeRateLimiter::createSampler;

public static boolean tryProbe(String probeId) {
RateLimitInfo rateLimitInfo =
Expand All @@ -37,19 +39,19 @@ public static boolean tryProbe(String probeId) {

private static RateLimitInfo getDefaultRateLimitInfo(String probeId) {
LOGGER.debug("Setting sampling with default snapshot rate for probeId={}", probeId);
return new RateLimitInfo(createSampler(DEFAULT_SNAPSHOT_RATE), true);
return new RateLimitInfo(samplerSupplier.apply(DEFAULT_SNAPSHOT_RATE), true);
}

public static void setRate(String probeId, double rate, boolean isCaptureSnapshot) {
PROBE_SAMPLERS.put(probeId, new RateLimitInfo(createSampler(rate), isCaptureSnapshot));
PROBE_SAMPLERS.put(probeId, new RateLimitInfo(samplerSupplier.apply(rate), isCaptureSnapshot));
}

public static void setGlobalSnapshotRate(double rate) {
GLOBAL_SNAPSHOT_SAMPLER = createSampler(rate);
GLOBAL_SNAPSHOT_SAMPLER = samplerSupplier.apply(rate);
}

public static void setGlobalLogRate(double rate) {
GLOBAL_LOG_SAMPLER = createSampler(rate);
GLOBAL_LOG_SAMPLER = samplerSupplier.apply(rate);
}

public static void resetRate(String probeId) {
Expand All @@ -65,6 +67,11 @@ public static void resetAll() {
resetGlobalRate();
}

public static void setSamplerSupplier(DoubleFunction<Sampler> samplerSupplier) {
ProbeRateLimiter.samplerSupplier =
samplerSupplier != null ? samplerSupplier : ProbeRateLimiter::createSampler;
}

private static Sampler createSampler(double rate) {
if (rate < 0) {
return new ConstantSampler(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,6 @@ private void applyRateLimiter(ConfigurationComparer changes) {
: getDefaultRateLimitPerProbe(probe),
probe.isCaptureSnapshot());
}
if (addedDefinitions instanceof SpanDecorationProbe) {
// Span decoration probes use the same instrumentation as log probes, but we don't want
// to sample here.
ProbeRateLimiter.setRate(addedDefinitions.getId(), -1, false);
}
}
// remove rate for all removed probes
for (ProbeDefinition removedDefinition : changes.getRemovedDefinitions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ private boolean addLineCaptures(LineMap lineMap) {
}
if (beforeLabel != null) {
InsnList insnList = new InsnList();
ldc(insnList, Type.getObjectType(classNode.name));
// stack [class, array]
pushProbesIds(insnList);
// stack [array]
invokeStatic(
insnList,
DEBUGGER_CONTEXT_TYPE,
"isReadyToCapture",
Type.BOOLEAN_TYPE,
CLASS_TYPE,
STRING_ARRAY_TYPE);
// stack [boolean]
LabelNode targetNode = new LabelNode();
Expand Down Expand Up @@ -314,10 +317,17 @@ private void instrumentMethodEnter() {
methodNode.instructions.insert(methodEnterLabel, insnList);
return;
}
ldc(insnList, Type.getObjectType(classNode.name));
// stack [class]
pushProbesIds(insnList);
// stack [array]
// stack [class, array]
invokeStatic(
insnList, DEBUGGER_CONTEXT_TYPE, "isReadyToCapture", Type.BOOLEAN_TYPE, STRING_ARRAY_TYPE);
insnList,
DEBUGGER_CONTEXT_TYPE,
"isReadyToCapture",
Type.BOOLEAN_TYPE,
CLASS_TYPE,
STRING_ARRAY_TYPE);
// stack [boolean]
LabelNode targetNode = new LabelNode();
LabelNode gotoNode = new LabelNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,24 +363,47 @@ public InstrumentationResult.Status instrument(
}

@Override
public void evaluate(CapturedContext context, CapturedContext.Status status) {
public void evaluate(
CapturedContext context, CapturedContext.Status status, MethodLocation methodLocation) {
if (!(status instanceof LogStatus)) {
throw new IllegalStateException("Invalid status: " + status.getClass());
}

LogStatus logStatus = (LogStatus) status;
if (!hasCondition()) {
// sample when no condition associated
sample(logStatus, methodLocation);
}
logStatus.setCondition(evaluateCondition(context, logStatus));
CapturedContext.CapturedThrowable throwable = context.getThrowable();
if (logStatus.hasConditionErrors() && throwable != null) {
logStatus.addError(
new EvaluationError(
"uncaught exception", throwable.getType() + ": " + throwable.getMessage()));
}
if (logStatus.getCondition()) {
if (hasCondition() && logStatus.getCondition()) {
// sample if probe has condition and condition is true
sample(logStatus, methodLocation);
}
if (logStatus.isSampled() && logStatus.getCondition()) {
LogMessageTemplateBuilder logMessageBuilder = new LogMessageTemplateBuilder(segments);
logStatus.setMessage(logMessageBuilder.evaluate(context, logStatus));
}
}

private void sample(LogStatus logStatus, MethodLocation methodLocation) {
// sample only once and when we need to evaluate
if (!MethodLocation.isSame(methodLocation, evaluateAt)) {
return;
}
boolean sampled = ProbeRateLimiter.tryProbe(id);
logStatus.setSampled(sampled);
if (!sampled) {
LOGGER.debug("{} not sampled!", id);
DebuggerAgent.getSink().skipSnapshot(id, DebuggerContext.SkipCause.RATE);
}
}

private boolean evaluateCondition(CapturedContext capture, LogStatus status) {
if (probeCondition == null) {
return true;
Expand Down Expand Up @@ -430,13 +453,6 @@ public void commit(
int maxDepth = capture != null ? capture.maxReferenceDepth : -1;
Snapshot snapshot = new Snapshot(Thread.currentThread(), this, maxDepth);
if (entryStatus.shouldSend() && exitStatus.shouldSend()) {
// only rate limit if a condition is defined
if (probeCondition != null) {
if (!ProbeRateLimiter.tryProbe(id)) {
sink.skipSnapshot(id, DebuggerContext.SkipCause.RATE);
return;
}
}
snapshot.setTraceId(traceId);
snapshot.setSpanId(spanId);
if (isCaptureSnapshot()) {
Expand Down Expand Up @@ -506,13 +522,6 @@ public void commit(CapturedContext lineContext, int line) {
Snapshot snapshot = new Snapshot(Thread.currentThread(), this, maxDepth);
boolean shouldCommit = false;
if (status.shouldSend()) {
// only rate limit if a condition is defined
if (probeCondition != null) {
if (!ProbeRateLimiter.tryProbe(id)) {
sink.skipSnapshot(id, DebuggerContext.SkipCause.RATE);
return;
}
}
snapshot.setTraceId(lineContext.getTraceId());
snapshot.setSpanId(lineContext.getSpanId());
if (isCaptureSnapshot()) {
Expand Down Expand Up @@ -554,6 +563,7 @@ public static class LogStatus extends CapturedContext.Status {
private boolean condition = true;
private boolean hasLogTemplateErrors;
private boolean hasConditionErrors;
private boolean sampled = true;
private String message;

public LogStatus(ProbeImplementation probeImplementation) {
Expand All @@ -567,7 +577,7 @@ private LogStatus(ProbeImplementation probeImplementation, boolean condition) {

@Override
public boolean shouldFreezeContext() {
return probeImplementation.isCaptureSnapshot() && shouldSend();
return sampled && probeImplementation.isCaptureSnapshot() && shouldSend();
}

@Override
Expand All @@ -576,7 +586,7 @@ public boolean isCapturing() {
}

public boolean shouldSend() {
return condition && !hasConditionErrors;
return sampled && condition && !hasConditionErrors;
}

public boolean shouldReportError() {
Expand Down Expand Up @@ -614,6 +624,14 @@ public void setMessage(String message) {
public String getMessage() {
return message;
}

public void setSampled(boolean sampled) {
this.sampled = sampled;
}

public boolean isSampled() {
return sampled;
}
}

@Generated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public ProbeLocation getLocation() {
}

@Override
public void evaluate(CapturedContext context, CapturedContext.Status status) {}
public void evaluate(
CapturedContext context, CapturedContext.Status status, MethodLocation methodLocation) {}

@Override
public void commit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public InstrumentationResult.Status instrument(
}

@Override
public void evaluate(CapturedContext context, CapturedContext.Status status) {
public void evaluate(
CapturedContext context, CapturedContext.Status status, MethodLocation methodLocation) {
for (Decoration decoration : decorations) {
if (decoration.when != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.datadog.debugger.util.SerializerWithLimits;
import com.squareup.moshi.JsonAdapter;
import datadog.trace.api.Config;
import datadog.trace.api.sampling.Sampler;
import datadog.trace.bootstrap.debugger.*;
import datadog.trace.bootstrap.debugger.el.ValueReferences;
import datadog.trace.bootstrap.debugger.util.Redaction;
Expand Down Expand Up @@ -1805,6 +1806,45 @@ public void typeRedactionCondition() throws IOException, URISyntaxException {
snapshots.get(2).getEvaluationErrors().get(0).getMessage());
}

@Test
public void samplingMethodProbe() throws IOException, URISyntaxException {
doSamplingTest(this::methodProbe, 1, 1);
}

@Test
public void samplingProbeCondition() throws IOException, URISyntaxException {
doSamplingTest(this::simpleConditionTest, 1, 1);
}

@Test
public void samplingDupMethodProbeCondition() throws IOException, URISyntaxException {
doSamplingTest(this::mergedProbesWithAdditionalProbeConditionTest, 2, 2);
}

@Test
public void samplingLineProbe() throws IOException, URISyntaxException {
doSamplingTest(this::singleLineProbe, 1, 1);
}

interface TestMethod {
void run() throws IOException, URISyntaxException;
}

private void doSamplingTest(TestMethod testRun, int expectedGlobalCount, int expectedProbeCount)
throws IOException, URISyntaxException {
MockSampler probeSampler = new MockSampler();
MockSampler globalSampler = new MockSampler();
ProbeRateLimiter.setSamplerSupplier(rate -> rate < 101 ? probeSampler : globalSampler);
ProbeRateLimiter.setGlobalSnapshotRate(1000);
try {
testRun.run();
} finally {
ProbeRateLimiter.setSamplerSupplier(null);
}
assertEquals(expectedGlobalCount, globalSampler.callCount);
assertEquals(expectedProbeCount, probeSampler.callCount);
}

private DebuggerTransformerTest.TestSnapshotListener setupInstrumentTheWorldTransformer(
String excludeFileName) {
Config config = mock(Config.class);
Expand Down Expand Up @@ -2141,6 +2181,27 @@ public void instrumentationResult(ProbeDefinition definition, InstrumentationRes
}
}

static class MockSampler implements Sampler {

int callCount;

@Override
public boolean sample() {
callCount++;
return true;
}

@Override
public boolean keep() {
return false;
}

@Override
public boolean drop() {
return false;
}
}

static class KotlinHelper {
public static Class<?> compileAndLoad(
String className, String sourceFileName, List<File> outputFilesToDelete) {
Expand Down

0 comments on commit 703c18d

Please sign in to comment.