Skip to content

Commit

Permalink
Fix Protobuf schema sampling logic (#7197)
Browse files Browse the repository at this point in the history
* fix schema sampling logic
* Capture serialization in DynamicMessages for Protobuf
  • Loading branch information
piochelepiotr committed Jun 20, 2024
1 parent 9bf5bbd commit 174ea28
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.instrumentation.kafka_clients;

import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.util.FNV64Hash;
Expand All @@ -9,7 +10,11 @@

public class AvroSchemaExtractor {
public static void tryExtractProducer(ProducerRecord record, AgentSpan span) {
Integer prio = span.getSamplingPriority();
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
if (!dsm.canSampleSchema(record.topic())) {
return;
}
Integer prio = span.forceSamplingDecision();
if (prio == null || prio <= 0) {
// don't extract schema if span is not sampled
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public String hierarchyMarkerType() {
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return declaresMethod(named("writeTo"))
.and(extendsClass(named(hierarchyMarkerType())))
.and(not(nameStartsWith("com.google.protobuf")));
.and(
not(nameStartsWith("com.google.protobuf"))
.or(named("com.google.protobuf.DynamicMessage")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ public SchemaSampler() {
}

public int trySample(long currentTimeMillis) {
weight.incrementAndGet();
if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) {
synchronized (this) {
if (currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS) {
lastSampleMillis = currentTimeMillis;
int currentWeight = weight.get();
weight.set(0);
return currentWeight;
return weight.getAndSet(0);
}
}
}
return 0;
}

public boolean canSample(long currentTimeMillis) {
weight.incrementAndGet();
return currentTimeMillis >= lastSampleMillis + SAMPLE_INTERVAL_MILLIS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)

then:
// the first received schema is sampled, with a weight of one.
dataStreams.canSampleSchema("schema1")
dataStreams.trySampleSchema("schema1") == 1
// the sampling is done by topic, so a schema on a different topic will also be sampled at once, also with a weight of one.
dataStreams.canSampleSchema("schema2")
dataStreams.trySampleSchema("schema2") == 1
dataStreams.trySampleSchema("schema1") == 0
dataStreams.trySampleSchema("schema1") == 0
// no time has passed from the last sampling, so the same schema is not sampled again (two times in a row).
!dataStreams.canSampleSchema("schema1")
!dataStreams.canSampleSchema("schema1")
timeSource.advance(30*1e9 as long)
// now, 30 seconds have passed, so the schema is sampled again, with a weight of 3 (so it includes the two times the schema was not sampled).
dataStreams.canSampleSchema("schema1")
dataStreams.trySampleSchema("schema1") == 3
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,18 @@ class SchemaSamplerTest extends DDCoreSpecification {
boolean canSample1 = sampler.canSample(currentTimeMillis)
int weight1 = sampler.trySample(currentTimeMillis)
boolean canSample2= sampler.canSample(currentTimeMillis + 1000)
int weight2 = sampler.trySample(currentTimeMillis + 1000)
boolean canSample3 = sampler.canSample(currentTimeMillis + 2000)
int weight3 = sampler.trySample(currentTimeMillis + 2000)
boolean canSample4 = sampler.canSample(currentTimeMillis + 30000)
int weight4 = sampler.trySample(currentTimeMillis + 30000)
boolean canSample5 = sampler.canSample(currentTimeMillis + 30001)
int weight5 = sampler.trySample(currentTimeMillis + 30001)

then:
canSample1
weight1 == 1
!canSample2
weight2 == 0
!canSample3
weight3 == 0
canSample4
weight4 == 3
!canSample5
weight5 == 0
}
}

0 comments on commit 174ea28

Please sign in to comment.