Skip to content

Commit

Permalink
Don't recompute DSM pathway hash for known tags (#7307)
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr authored Aug 7, 2024
1 parent facbcfa commit 7059a72
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datadog.trace.core.datastreams;

import datadog.trace.api.TraceConfig;
import datadog.trace.api.WellKnownTags;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
Expand All @@ -12,17 +11,17 @@ public class DataStreamContextExtractor implements HttpCodec.Extractor {
private final HttpCodec.Extractor delegate;
private final TimeSource timeSource;
private final Supplier<TraceConfig> traceConfigSupplier;
private final WellKnownTags wellKnownTags;
private final long hashOfKnownTags;

public DataStreamContextExtractor(
HttpCodec.Extractor delegate,
TimeSource timeSource,
Supplier<TraceConfig> traceConfigSupplier,
WellKnownTags wellKnownTags) {
long hashOfKnownTags) {
this.delegate = delegate;
this.timeSource = timeSource;
this.traceConfigSupplier = traceConfigSupplier;
this.wellKnownTags = wellKnownTags;
this.hashOfKnownTags = hashOfKnownTags;
}

@Override
Expand All @@ -38,15 +37,15 @@ public <C> TagContext extract(C carrier, AgentPropagation.ContextVisitor<C> gett

if (shouldExtractPathwayContext) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.wellKnownTags);
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags);

extracted.withPathwayContext(pathwayContext);
}

return extracted;
} else if (traceConfigSupplier.get().isDataStreamsEnabled()) {
DefaultPathwayContext pathwayContext =
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.wellKnownTags);
DefaultPathwayContext.extract(carrier, getter, this.timeSource, this.hashOfKnownTags);

if (pathwayContext != null) {
extracted = new TagContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
private final DatastreamsPayloadWriter payloadWriter;
private final DDAgentFeaturesDiscovery features;
private final TimeSource timeSource;
private final WellKnownTags wellKnownTags;
private final long hashOfKnownTags;
private final Supplier<TraceConfig> traceConfigSupplier;
private final long bucketDurationNanos;
private final DataStreamContextInjector injector;
Expand Down Expand Up @@ -124,7 +124,7 @@ public DefaultDataStreamsMonitoring(
this.features = features;
this.timeSource = timeSource;
this.traceConfigSupplier = traceConfigSupplier;
this.wellKnownTags = wellKnownTags;
this.hashOfKnownTags = DefaultPathwayContext.getBaseHash(wellKnownTags);
this.payloadWriter = payloadWriter;
this.bucketDurationNanos = bucketDurationNanos;
this.injector = new DataStreamContextInjector(this);
Expand Down Expand Up @@ -189,15 +189,16 @@ public void setProduceCheckpoint(String type, String target) {
@Override
public PathwayContext newPathwayContext() {
if (configSupportsDataStreams) {
return new DefaultPathwayContext(timeSource, wellKnownTags);
return new DefaultPathwayContext(timeSource, hashOfKnownTags);
} else {
return AgentTracer.NoopPathwayContext.INSTANCE;
}
}

@Override
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
return new DataStreamContextExtractor(delegate, timeSource, traceConfigSupplier, wellKnownTags);
return new DataStreamContextExtractor(
delegate, timeSource, traceConfigSupplier, hashOfKnownTags);
}

@Override
Expand All @@ -213,7 +214,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie
carrier,
DataStreamsContextCarrierAdapter.INSTANCE,
this.timeSource,
this.wellKnownTags);
this.hashOfKnownTags);
((DDSpan) span).context().mergePathwayContext(pathwayContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class DefaultPathwayContext implements PathwayContext {
private static final Logger log = LoggerFactory.getLogger(DefaultPathwayContext.class);
private final Lock lock = new ReentrantLock();
private final WellKnownTags wellKnownTags;
private final long hashOfKnownTags;
private final TimeSource timeSource;
private final GrowingByteArrayOutput outputBuffer =
GrowingByteArrayOutput.withInitialCapacity(20);
Expand Down Expand Up @@ -68,20 +68,19 @@ public class DefaultPathwayContext implements PathwayContext {
TagsProcessor.DATASET_NAMESPACE_TAG,
TagsProcessor.MANUAL_TAG));

public DefaultPathwayContext(TimeSource timeSource, WellKnownTags wellKnownTags) {
public DefaultPathwayContext(TimeSource timeSource, long hashOfKnownTags) {
this.timeSource = timeSource;
this.wellKnownTags = wellKnownTags;
this.hashOfKnownTags = hashOfKnownTags;
}

private DefaultPathwayContext(
TimeSource timeSource,
WellKnownTags wellKnownTags,
long hashOfKnownTags,
long pathwayStartNanos,
long pathwayStartNanoTicks,
long edgeStartNanoTicks,
long hash) {
this.timeSource = timeSource;
this.wellKnownTags = wellKnownTags;
this(timeSource, hashOfKnownTags);
this.pathwayStartNanos = pathwayStartNanos;
this.pathwayStartNanoTicks = pathwayStartNanoTicks;
this.edgeStartNanoTicks = edgeStartNanoTicks;
Expand Down Expand Up @@ -127,7 +126,7 @@ public void setCheckpoint(
// So far, each tag key has only one tag value, so we're initializing the capacity to match
// the number of tag keys for now. We should revisit this later if it's no longer the case.
List<String> allTags = new ArrayList<>(sortedTags.size());
PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(wellKnownTags);
PathwayHashBuilder pathwayHashBuilder = new PathwayHashBuilder(hashOfKnownTags);
DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder();

if (!started) {
Expand Down Expand Up @@ -272,19 +271,19 @@ public String toString() {

private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier {
private final TimeSource timeSource;
private final WellKnownTags wellKnownTags;
private final long hashOfKnownTags;
private DefaultPathwayContext extractedContext;

PathwayContextExtractor(TimeSource timeSource, WellKnownTags wellKnownTags) {
PathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) {
this.timeSource = timeSource;
this.wellKnownTags = wellKnownTags;
this.hashOfKnownTags = hashOfKnownTags;
}

@Override
public boolean accept(String key, String value) {
if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) {
try {
extractedContext = strDecode(timeSource, wellKnownTags, value);
extractedContext = strDecode(timeSource, hashOfKnownTags, value);
} catch (IOException e) {
return false;
}
Expand All @@ -296,28 +295,28 @@ public boolean accept(String key, String value) {
private static class BinaryPathwayContextExtractor
implements AgentPropagation.BinaryKeyClassifier {
private final TimeSource timeSource;
private final WellKnownTags wellKnownTags;
private final long hashOfKnownTags;
private DefaultPathwayContext extractedContext;

BinaryPathwayContextExtractor(TimeSource timeSource, WellKnownTags wellKnownTags) {
BinaryPathwayContextExtractor(TimeSource timeSource, long hashOfKnownTags) {
this.timeSource = timeSource;
this.wellKnownTags = wellKnownTags;
this.hashOfKnownTags = hashOfKnownTags;
}

@Override
public boolean accept(String key, byte[] value) {
// older versions support, should be removed in the future
if (PathwayContext.PROPAGATION_KEY.equalsIgnoreCase(key)) {
try {
extractedContext = decode(timeSource, wellKnownTags, value);
extractedContext = decode(timeSource, hashOfKnownTags, value);
} catch (IOException e) {
return false;
}
}

if (PathwayContext.PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) {
try {
extractedContext = base64Decode(timeSource, wellKnownTags, value);
extractedContext = base64Decode(timeSource, hashOfKnownTags, value);
} catch (IOException e) {
return false;
}
Expand All @@ -330,13 +329,13 @@ static <C> DefaultPathwayContext extract(
C carrier,
AgentPropagation.ContextVisitor<C> getter,
TimeSource timeSource,
WellKnownTags wellKnownTags) {
long hashOfKnownTags) {
if (getter instanceof AgentPropagation.BinaryContextVisitor) {
return extractBinary(
carrier, (AgentPropagation.BinaryContextVisitor) getter, timeSource, wellKnownTags);
carrier, (AgentPropagation.BinaryContextVisitor) getter, timeSource, hashOfKnownTags);
}
PathwayContextExtractor pathwayContextExtractor =
new PathwayContextExtractor(timeSource, wellKnownTags);
new PathwayContextExtractor(timeSource, hashOfKnownTags);
getter.forEachKey(carrier, pathwayContextExtractor);
if (pathwayContextExtractor.extractedContext == null) {
log.debug("No context extracted");
Expand All @@ -350,9 +349,9 @@ static <C> DefaultPathwayContext extractBinary(
C carrier,
AgentPropagation.BinaryContextVisitor<C> getter,
TimeSource timeSource,
WellKnownTags wellKnownTags) {
long hashOfKnownTags) {
BinaryPathwayContextExtractor pathwayContextExtractor =
new BinaryPathwayContextExtractor(timeSource, wellKnownTags);
new BinaryPathwayContextExtractor(timeSource, hashOfKnownTags);
getter.forEachKey(carrier, pathwayContextExtractor);
if (pathwayContextExtractor.extractedContext == null) {
log.debug("No context extracted");
Expand All @@ -363,18 +362,18 @@ static <C> DefaultPathwayContext extractBinary(
}

private static DefaultPathwayContext strDecode(
TimeSource timeSource, WellKnownTags wellKnownTags, String data) throws IOException {
TimeSource timeSource, long hashOfKnownTags, String data) throws IOException {
byte[] base64Bytes = data.getBytes(UTF_8);
return base64Decode(timeSource, wellKnownTags, base64Bytes);
return base64Decode(timeSource, hashOfKnownTags, base64Bytes);
}

private static DefaultPathwayContext base64Decode(
TimeSource timeSource, WellKnownTags wellKnownTags, byte[] data) throws IOException {
return decode(timeSource, wellKnownTags, Base64.getDecoder().decode(data));
TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException {
return decode(timeSource, hashOfKnownTags, Base64.getDecoder().decode(data));
}

private static DefaultPathwayContext decode(
TimeSource timeSource, WellKnownTags wellKnownTags, byte[] data) throws IOException {
TimeSource timeSource, long hashOfKnownTags, byte[] data) throws IOException {
ByteArrayInput input = ByteArrayInput.wrap(data);

long hash = input.readLongLE();
Expand All @@ -394,7 +393,7 @@ private static DefaultPathwayContext decode(

return new DefaultPathwayContext(
timeSource,
wellKnownTags,
hashOfKnownTags,
pathwayStartNanos,
pathwayStartNanoTicks,
edgeStartNanoTicks,
Expand All @@ -411,35 +410,35 @@ public long addValue(String val) {
}

private static class PathwayHashBuilder {
private final StringBuilder builder;

public PathwayHashBuilder(WellKnownTags wellKnownTags) {
builder = new StringBuilder();
builder.append(wellKnownTags.getService());
builder.append(wellKnownTags.getEnv());
private long hash;

String primaryTag = Config.get().getPrimaryTag();
if (primaryTag != null) {
builder.append(primaryTag);
}
public PathwayHashBuilder(long baseHash) {
hash = baseHash;
}

public void addTag(String tag) {
builder.append(tag);
hash = FNV64Hash.continueHash(hash, tag, FNV64Hash.Version.v1);
}

public long generateHash() {
return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1);
public long getHash() {
return hash;
}
}

@Override
public String toString() {
return builder.toString();
public static long getBaseHash(WellKnownTags wellKnownTags) {
StringBuilder builder = new StringBuilder();
builder.append(wellKnownTags.getService());
builder.append(wellKnownTags.getEnv());

String primaryTag = Config.get().getPrimaryTag();
if (primaryTag != null) {
builder.append(primaryTag);
}
return FNV64Hash.generateHash(builder.toString(), FNV64Hash.Version.v1);
}

private long generateNodeHash(PathwayHashBuilder pathwayHashBuilder) {
return pathwayHashBuilder.generateHash();
return pathwayHashBuilder.getHash();
}

private long generatePathwayHash(long nodeHash, long parentHash) {
Expand Down
Loading

0 comments on commit 7059a72

Please sign in to comment.