Skip to content

Commit

Permalink
Track payload size for Kafka integration (#6045)
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr committed Oct 31, 2023
1 parent 34c6955 commit 17c097d
Show file tree
Hide file tree
Showing 21 changed files with 196 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public AgentSpan startSpan(
}
AgentPropagation.ContextVisitor<REQUEST_CARRIER> getter = getter();
if (null != carrier && null != getter) {
tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0);
tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
final AgentSpan span =
startSpan(DECORATE.instrumentationNames()[0], GRPC_SERVER, spanContext).setMeasured(true);

AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);

RequestContext reqContext = span.getRequestContext();
if (reqContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected void startNewMessageSpan(Message message) {
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0);
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl, requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
CallbackProvider cbp = tracer.getCallbackProvider(RequestContextSlot.APPSEC);
final AgentSpan span = startSpan(GRPC_SERVER, spanContext).setMeasured(true);

AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);

RequestContext reqContext = span.getRequestContext();
if (reqContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public String[] helperClassNames() {
packageName + ".TracingIterable",
packageName + ".TracingIterator",
packageName + ".TracingList",
packageName + ".TracingListIterator"
packageName + ".TracingListIterator",
packageName + ".Utils"
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.trace.api.Config;
Expand Down Expand Up @@ -92,7 +93,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, val.timestamp());
.setCheckpoint(span, sortedTags, val.timestamp(), computePayloadSizeBytes(val));
} else {
span = startSpan(operationName, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package datadog.trace.instrumentation.kafka_clients;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

public final class Utils {
private Utils() {} // prevent instanciation

// this method is used in kafka-clients and kafka-streams instrumentations
public static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Headers headers = val.headers();
if (headers != null)
for (Header h : headers) {
headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length;
}
return headersSize + val.serializedKeySize() + val.serializedValueSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME;
Expand Down Expand Up @@ -36,6 +37,7 @@
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
Expand All @@ -59,6 +61,7 @@ public String instrumentedType() {
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator",
"datadog.trace.instrumentation.kafka_clients.Utils",
packageName + ".KafkaStreamsDecorator",
packageName + ".ProcessorRecordContextVisitor",
packageName + ".StampedRecordContextVisitor",
Expand Down Expand Up @@ -244,7 +247,8 @@ public static void start(
sortedTags.put(TYPE_TAG, "kafka");
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp);
.setCheckpoint(
span, sortedTags, record.timestamp, computePayloadSizeBytes(record.value));
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down Expand Up @@ -307,9 +311,18 @@ public static void start(
}
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");

long payloadSize = 0;
// we have to go through Object to get the RecordMetadata here because the class of `record`
// only implements it after 2.7 (and this class is only used if v >= 2.7)
if ((Object) record instanceof RecordMetadata) { // should always be true
RecordMetadata metadata = (RecordMetadata) (Object) record;
payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize();
}

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp());
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ public static AgentScope startReceivingSpan(
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, queue);
sortedTags.put(TYPE_TAG, "rabbitmq");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, produceMillis);
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, produceMillis, 0);
}

CONSUMER_DECORATE.afterStart(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
static final long DEFAULT_BUCKET_DURATION_NANOS = TimeUnit.SECONDS.toNanos(10);
static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);

private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0);
private static final StatsPoint REPORT =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
private static final StatsPoint POISON_PILL =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0);
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);

private final Map<Long, StatsBucket> timeToBucket = new HashMap<>();
private final BlockingQueue<InboxItem> inbox = new MpscBlockingConsumerArrayQueue<>(1024);
Expand Down Expand Up @@ -204,10 +205,13 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {

@Override
public void setCheckpoint(
AgentSpan span, LinkedHashMap<String, String> sortedTags, long defaultTimestamp) {
AgentSpan span,
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes) {
PathwayContext pathwayContext = span.context().getPathwayContext();
if (pathwayContext != null) {
pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp);
pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes);
if (pathwayContext.getHash() != 0) {
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
}
Expand All @@ -233,7 +237,7 @@ public void setConsumeCheckpoint(String type, String source, DataStreamsContextC
sortedTags.put(TOPIC_TAG, source);
sortedTags.put(TYPE_TAG, type);

setCheckpoint(span, sortedTags, 0);
setCheckpoint(span, sortedTags, 0, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ public long getHash() {
@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer) {
setCheckpoint(sortedTags, pointConsumer, 0);
setCheckpoint(sortedTags, pointConsumer, 0, 0);
}

@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags,
Consumer<StatsPoint> pointConsumer,
long defaultTimestamp) {
setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0);
}

@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags,
Consumer<StatsPoint> pointConsumer,
long defaultTimestamp,
long payloadSizeBytes) {
long startNanos = timeSource.getCurrentTimeNanos();
long nanoTicks = timeSource.getNanoTicks();
lock.lock();
Expand Down Expand Up @@ -167,7 +176,8 @@ public void setCheckpoint(
hash,
timeSource.getCurrentTimeNanos(),
pathwayLatencyNano,
edgeLatencyNano);
edgeLatencyNano,
payloadSizeBytes);
edgeStartNanoTicks = nanoTicks;
hash = newHash;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] DURATION = "Duration".getBytes(ISO_8859_1);
private static final byte[] PATHWAY_LATENCY = "PathwayLatency".getBytes(ISO_8859_1);
private static final byte[] EDGE_LATENCY = "EdgeLatency".getBytes(ISO_8859_1);
private static final byte[] PAYLOAD_SIZE = "PayloadSize".getBytes(ISO_8859_1);
private static final byte[] SERVICE = "Service".getBytes(ISO_8859_1);
private static final byte[] EDGE_TAGS = "EdgeTags".getBytes(ISO_8859_1);
private static final byte[] BACKLOGS = "Backlogs".getBytes(ISO_8859_1);
Expand Down Expand Up @@ -118,7 +119,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
for (StatsGroup group : groups) {
boolean firstNode = group.getEdgeTags().isEmpty();

packer.startMap(firstNode ? 4 : 5);
packer.startMap(firstNode ? 5 : 6);

/* 1 */
packer.writeUTF8(PATHWAY_LATENCY);
Expand All @@ -129,15 +130,19 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
packer.writeBinary(group.getEdgeLatency().serialize());

/* 3 */
packer.writeUTF8(PAYLOAD_SIZE);
packer.writeBinary(group.getPayloadSize().serialize());

/* 4 */
packer.writeUTF8(HASH);
packer.writeUnsignedLong(group.getHash());

/* 4 */
/* 5 */
packer.writeUTF8(PARENT_HASH);
packer.writeUnsignedLong(group.getParentHash());

if (!firstNode) {
/* 5 */
/* 6 */
packer.writeUTF8(EDGE_TAGS);
packer.startArray(group.getEdgeTags().size());
for (String tag : group.getEdgeTags()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ public void addPoint(StatsPoint statsPoint) {
hashToGroup.put(statsPoint.getHash(), statsGroup);
}

statsGroup.add(statsPoint.getPathwayLatencyNano(), statsPoint.getEdgeLatencyNano());
statsGroup.add(
statsPoint.getPathwayLatencyNano(),
statsPoint.getEdgeLatencyNano(),
statsPoint.getPayloadSizeBytes());
}

public void addBacklog(Backlog backlog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ public class StatsGroup {
private final long parentHash;
private final Histogram pathwayLatency;
private final Histogram edgeLatency;
private final Histogram payloadSize;

public StatsGroup(List<String> edgeTags, long hash, long parentHash) {
this.edgeTags = edgeTags;
this.hash = hash;
this.parentHash = parentHash;
pathwayLatency = Histograms.newHistogram();
edgeLatency = Histograms.newHistogram();
payloadSize = Histograms.newHistogram();
}

public void add(long pathwayLatencyNano, long edgeLatencyNano) {
public void add(long pathwayLatencyNano, long edgeLatencyNano, long payloadSizeBytes) {
pathwayLatency.accept(((double) pathwayLatencyNano) / NANOSECONDS_TO_SECOND);
edgeLatency.accept(((double) edgeLatencyNano) / NANOSECONDS_TO_SECOND);
// payload size is set to zero when we cannot compute it
// in that case, it's probably better to have an empty histogram than filling it with zeros
if (payloadSizeBytes != 0) payloadSize.accept((double) payloadSizeBytes);
}

public List<String> getEdgeTags() {
Expand All @@ -46,6 +51,10 @@ public Histogram getEdgeLatency() {
return edgeLatency;
}

public Histogram getPayloadSize() {
return payloadSize;
}

@Override
public String toString() {
return "StatsGroup{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.util.concurrent.PollingConditions

import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static DefaultDataStreamsMonitoring.DEFAULT_BUCKET_DURATION_NANOS
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static java.util.concurrent.TimeUnit.SECONDS


/**
* This test class exists because a real integration test is not possible. see DataStreamsIntegrationTest
*/
Expand Down Expand Up @@ -79,15 +78,15 @@ class DataStreamsWritingTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig })
dataStreams.start()
dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0))
dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 100)
dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 130)
dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0, 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0))
dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100)
dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130)
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l)
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10)))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10))
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5)))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2))
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -132,15 +131,17 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == "Stats"
assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket

Set availableSizes = [4, 5] // we don't know the order the groups will be reported
Set availableSizes = [5, 6] // we don't know the order the groups will be reported
2.times {
int mapHeaderSize = unpacker.unpackMapHeader()
assert availableSizes.remove(mapHeaderSize)
if (mapHeaderSize == 4) { // empty topic group
if (mapHeaderSize == 5) { // empty topic group
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 9
assert unpacker.unpackString() == "ParentHash"
Expand All @@ -150,6 +151,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 1
assert unpacker.unpackString() == "ParentHash"
Expand Down Expand Up @@ -185,11 +188,13 @@ class DataStreamsWritingTest extends DDCoreSpecification {

Set<Long> availableHashes = [1L, 3L] // we don't know the order the groups will be reported
2.times {
assert unpacker.unpackMapHeader() == 5
assert unpacker.unpackMapHeader() == 6
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
def hash = unpacker.unpackLong()
assert availableHashes.remove(hash)
Expand Down
Loading

0 comments on commit 17c097d

Please sign in to comment.