Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track payload size for Kafka integration #6045

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public String[] helperClassNames() {
packageName + ".TracingIterable",
packageName + ".TracingIterator",
packageName + ".TracingList",
packageName + ".TracingListIterator"
packageName + ".TracingListIterator",
packageName + ".Utils"
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_DELIVER;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.TIME_IN_QUEUE_ENABLED;
import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import datadog.trace.api.Config;
Expand Down Expand Up @@ -92,7 +93,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, val.timestamp());
.setCheckpoint(span, sortedTags, val.timestamp(), computePayloadSizeBytes(val));
} else {
span = startSpan(operationName, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package datadog.trace.instrumentation.kafka_clients;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

public class Utils {
vandonr marked this conversation as resolved.
Show resolved Hide resolved
// this method is used in kafka-clients and kafka-streams instrumentations
public static long computePayloadSizeBytes(ConsumerRecord<?, ?> val) {
long headersSize = 0;
Headers headers = val.headers();
if (headers != null)
for (Header h : headers) {
headersSize += h.value().length + h.key().getBytes(StandardCharsets.UTF_8).length;
}
return headersSize + val.serializedKeySize() + val.serializedValueSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static datadog.trace.core.datastreams.TagsProcessor.GROUP_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.kafka_clients.Utils.computePayloadSizeBytes;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.BROKER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.KAFKA_CONSUME;
Expand Down Expand Up @@ -36,6 +37,7 @@
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
Expand All @@ -59,6 +61,7 @@ public String instrumentedType() {
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.kafka_clients.TracingIterableDelegator",
"datadog.trace.instrumentation.kafka_clients.Utils",
packageName + ".KafkaStreamsDecorator",
packageName + ".ProcessorRecordContextVisitor",
packageName + ".StampedRecordContextVisitor",
Expand Down Expand Up @@ -244,7 +247,8 @@ public static void start(
sortedTags.put(TYPE_TAG, "kafka");
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp);
.setCheckpoint(
span, sortedTags, record.timestamp, computePayloadSizeBytes(record.value));
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down Expand Up @@ -307,9 +311,18 @@ public static void start(
}
sortedTags.put(TOPIC_TAG, record.topic());
sortedTags.put(TYPE_TAG, "kafka");

long payloadSize = 0;
// we have to go through Object to get the RecordMetadata here because the class of `record`
// only implements it after 2.7 (and this class is only used if v >= 2.7)
if ((Object) record instanceof RecordMetadata) { // should always be true
RecordMetadata metadata = (RecordMetadata) (Object) record;
payloadSize = metadata.serializedKeySize() + metadata.serializedValueSize();
}

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp());
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
} else {
span = startSpan(KAFKA_CONSUME, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
static final long DEFAULT_BUCKET_DURATION_NANOS = TimeUnit.SECONDS.toNanos(10);
static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5);

private static final StatsPoint REPORT = new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0);
private static final StatsPoint REPORT =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);
private static final StatsPoint POISON_PILL =
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0);
new StatsPoint(Collections.emptyList(), 0, 0, 0, 0, 0, 0);

private final Map<Long, StatsBucket> timeToBucket = new HashMap<>();
private final BlockingQueue<InboxItem> inbox = new MpscBlockingConsumerArrayQueue<>(1024);
Expand Down Expand Up @@ -205,9 +206,18 @@ public void trackBacklog(LinkedHashMap<String, String> sortedTags, long value) {
@Override
public void setCheckpoint(
AgentSpan span, LinkedHashMap<String, String> sortedTags, long defaultTimestamp) {
setCheckpoint(span, sortedTags, defaultTimestamp, 0);
}

@Override
public void setCheckpoint(
AgentSpan span,
LinkedHashMap<String, String> sortedTags,
long defaultTimestamp,
long payloadSizeBytes) {
vandonr marked this conversation as resolved.
Show resolved Hide resolved
PathwayContext pathwayContext = span.context().getPathwayContext();
if (pathwayContext != null) {
pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp);
pathwayContext.setCheckpoint(sortedTags, this::add, defaultTimestamp, payloadSizeBytes);
if (pathwayContext.getHash() != 0) {
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,23 @@ public long getHash() {
@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags, Consumer<StatsPoint> pointConsumer) {
setCheckpoint(sortedTags, pointConsumer, 0);
setCheckpoint(sortedTags, pointConsumer, 0, 0);
}

@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags,
Consumer<StatsPoint> pointConsumer,
long defaultTimestamp) {
setCheckpoint(sortedTags, pointConsumer, defaultTimestamp, 0);
}

@Override
public void setCheckpoint(
LinkedHashMap<String, String> sortedTags,
Consumer<StatsPoint> pointConsumer,
long defaultTimestamp,
long payloadSizeBytes) {
vandonr marked this conversation as resolved.
Show resolved Hide resolved
long startNanos = timeSource.getCurrentTimeNanos();
long nanoTicks = timeSource.getNanoTicks();
lock.lock();
Expand Down Expand Up @@ -167,7 +176,8 @@ public void setCheckpoint(
hash,
timeSource.getCurrentTimeNanos(),
pathwayLatencyNano,
edgeLatencyNano);
edgeLatencyNano,
payloadSizeBytes);
edgeStartNanoTicks = nanoTicks;
hash = newHash;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private static final byte[] DURATION = "Duration".getBytes(ISO_8859_1);
private static final byte[] PATHWAY_LATENCY = "PathwayLatency".getBytes(ISO_8859_1);
private static final byte[] EDGE_LATENCY = "EdgeLatency".getBytes(ISO_8859_1);
private static final byte[] PAYLOAD_SIZE = "PayloadSize".getBytes(ISO_8859_1);
private static final byte[] SERVICE = "Service".getBytes(ISO_8859_1);
private static final byte[] EDGE_TAGS = "EdgeTags".getBytes(ISO_8859_1);
private static final byte[] BACKLOGS = "Backlogs".getBytes(ISO_8859_1);
Expand Down Expand Up @@ -118,7 +119,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
for (StatsGroup group : groups) {
boolean firstNode = group.getEdgeTags().isEmpty();

packer.startMap(firstNode ? 4 : 5);
packer.startMap(firstNode ? 5 : 6);

/* 1 */
packer.writeUTF8(PATHWAY_LATENCY);
Expand All @@ -129,15 +130,19 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
packer.writeBinary(group.getEdgeLatency().serialize());

/* 3 */
packer.writeUTF8(PAYLOAD_SIZE);
packer.writeBinary(group.getPayloadSize().serialize());

/* 4 */
packer.writeUTF8(HASH);
packer.writeUnsignedLong(group.getHash());

/* 4 */
/* 5 */
packer.writeUTF8(PARENT_HASH);
packer.writeUnsignedLong(group.getParentHash());

if (!firstNode) {
/* 5 */
/* 6 */
packer.writeUTF8(EDGE_TAGS);
packer.startArray(group.getEdgeTags().size());
for (String tag : group.getEdgeTags()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ public void addPoint(StatsPoint statsPoint) {
hashToGroup.put(statsPoint.getHash(), statsGroup);
}

statsGroup.add(statsPoint.getPathwayLatencyNano(), statsPoint.getEdgeLatencyNano());
statsGroup.add(
statsPoint.getPathwayLatencyNano(),
statsPoint.getEdgeLatencyNano(),
statsPoint.getPayloadSizeBytes());
}

public void addBacklog(Backlog backlog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ public class StatsGroup {
private final long parentHash;
private final Histogram pathwayLatency;
private final Histogram edgeLatency;
private final Histogram payloadSize;

public StatsGroup(List<String> edgeTags, long hash, long parentHash) {
this.edgeTags = edgeTags;
this.hash = hash;
this.parentHash = parentHash;
pathwayLatency = Histograms.newHistogram();
edgeLatency = Histograms.newHistogram();
payloadSize = Histograms.newHistogram();
}

public void add(long pathwayLatencyNano, long edgeLatencyNano) {
public void add(long pathwayLatencyNano, long edgeLatencyNano, long payloadSizeBytes) {
pathwayLatency.accept(((double) pathwayLatencyNano) / NANOSECONDS_TO_SECOND);
edgeLatency.accept(((double) edgeLatencyNano) / NANOSECONDS_TO_SECOND);
// payload size is set to zero when we cannot compute it
// in that case, it's probably better to have an empty histogram than filling it with zeros
if (payloadSizeBytes != 0) payloadSize.accept((double) payloadSizeBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't even know spotless would agree to use such kind of one-liner... 🤔

}

public List<String> getEdgeTags() {
Expand All @@ -46,6 +51,10 @@ public Histogram getEdgeLatency() {
return edgeLatency;
}

public Histogram getPayloadSize() {
return payloadSize;
}

@Override
public String toString() {
return "StatsGroup{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.util.concurrent.PollingConditions

import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static DefaultDataStreamsMonitoring.DEFAULT_BUCKET_DURATION_NANOS
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static java.util.concurrent.TimeUnit.SECONDS


/**
* This test class exists because a real integration test is not possible. see DataStreamsIntegrationTest
*/
Expand Down Expand Up @@ -79,15 +78,15 @@ class DataStreamsWritingTest extends DDCoreSpecification {
when:
def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig })
dataStreams.start()
dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0))
dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 100)
dataStreams.trackBacklog(new LinkedHashMap<>(["partition":"1", "topic":"testTopic", "type":"kafka_produce"]), 130)
dataStreams.add(new StatsPoint([], 9, 0, timeSource.currentTimeNanos, 0, 0, 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, 0, 0, 0))
dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 100)
dataStreams.trackBacklog(new LinkedHashMap<>(["partition": "1", "topic": "testTopic", "type": "kafka_produce"]), 130)
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS - 100l)
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10)))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(10), SECONDS.toNanos(10), 10))
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5)))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, timeSource.currentTimeNanos, SECONDS.toNanos(5), SECONDS.toNanos(5), 5))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic2"], 3, 4, timeSource.currentTimeNanos, SECONDS.toNanos(2), 0, 2))
timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS)
dataStreams.report()

Expand Down Expand Up @@ -132,15 +131,17 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == "Stats"
assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket

Set availableSizes = [4, 5] // we don't know the order the groups will be reported
Set availableSizes = [5, 6] // we don't know the order the groups will be reported
2.times {
int mapHeaderSize = unpacker.unpackMapHeader()
assert availableSizes.remove(mapHeaderSize)
if (mapHeaderSize == 4) { // empty topic group
if (mapHeaderSize == 5) { // empty topic group
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 9
assert unpacker.unpackString() == "ParentHash"
Expand All @@ -150,6 +151,8 @@ class DataStreamsWritingTest extends DDCoreSpecification {
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 1
assert unpacker.unpackString() == "ParentHash"
Expand Down Expand Up @@ -185,11 +188,13 @@ class DataStreamsWritingTest extends DDCoreSpecification {

Set<Long> availableHashes = [1L, 3L] // we don't know the order the groups will be reported
2.times {
assert unpacker.unpackMapHeader() == 5
assert unpacker.unpackMapHeader() == 6
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
def hash = unpacker.unpackLong()
assert availableHashes.remove(hash)
Expand Down
Loading
Loading