Skip to content

Commit

Permalink
Add high rate queue for log template snapshots (#7310)
Browse files Browse the repository at this point in the history
Historically, the Queue inside SnapshotSink was designed for a low
rate number of snapshots (1 per second) and the consumer thread is
the one shared with all other tasks (AgentTaskScheduler.INSTANCE). So
the reactivity of the thread to consume the queue is not enough for
handling up to 5000 snapshots/s allowed for log template probes.
We introduce then a dedicated high rate queue with dedicated threads
with better reactivity (max 100ms between polls). All non-capturing
snapshots are added into this queue.
  • Loading branch information
jpbempel authored Jul 15, 2024
1 parent 0e5e274 commit 2975ecb
Show file tree
Hide file tree
Showing 14 changed files with 544 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import com.datadog.debugger.exception.ExceptionProbeManager;
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.ProbeStatusSink;
import com.datadog.debugger.sink.SnapshotSink;
import com.datadog.debugger.sink.SymbolSink;
import com.datadog.debugger.symbol.SymDBEnablement;
import com.datadog.debugger.symbol.SymbolAggregator;
import com.datadog.debugger.uploader.BatchUploader;
import com.datadog.debugger.util.ClassNameFiltering;
import com.datadog.debugger.util.DebuggerMetrics;
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.remoteconfig.ConfigurationPoller;
Expand All @@ -19,7 +22,9 @@
import datadog.trace.api.flare.TracerFlare;
import datadog.trace.bootstrap.debugger.DebuggerContext;
import datadog.trace.bootstrap.debugger.util.Redaction;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.util.SizeCheckedInputStream;
import datadog.trace.util.TagsHelper;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -30,6 +35,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.stream.Collectors;
import java.util.zip.ZipOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,7 +69,7 @@ public static synchronized void run(
ProbeStatusSink probeStatusSink =
new ProbeStatusSink(
config, diagnosticEndpoint, ddAgentFeaturesDiscovery.supportsDebuggerDiagnostics());
DebuggerSink debuggerSink = new DebuggerSink(config, probeStatusSink);
DebuggerSink debuggerSink = createDebuggerSink(config, probeStatusSink);
debuggerSink.start();
ClassNameFiltering classNameFiltering = new ClassNameFiltering(config);
ConfigurationUpdater configurationUpdater =
Expand Down Expand Up @@ -122,9 +128,7 @@ public static synchronized void run(
Note: shutdown hooks are tricky because JVM holds reference for them forever preventing
GC for anything that is reachable from it.
*/
Runtime.getRuntime()
.addShutdownHook(
new ShutdownHook(configurationPoller, debuggerSink.getSnapshotUploader()));
Runtime.getRuntime().addShutdownHook(new ShutdownHook(configurationPoller, debuggerSink));
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
Expand All @@ -139,6 +143,39 @@ public static synchronized void run(
new DebuggerReporter(configurationUpdater, sink, exceptionProbeManager));
}

private static DebuggerSink createDebuggerSink(Config config, ProbeStatusSink probeStatusSink) {
String tags = getDefaultTagsMergedWithGlobalTags(config);
SnapshotSink snapshotSink =
new SnapshotSink(
config, tags, new BatchUploader(config, config.getFinalDebuggerSnapshotUrl()));
SymbolSink symbolSink = new SymbolSink(config);
return new DebuggerSink(
config,
tags,
DebuggerMetrics.getInstance(config),
probeStatusSink,
snapshotSink,
symbolSink);
}

public static String getDefaultTagsMergedWithGlobalTags(Config config) {
String debuggerTags =
TagsHelper.concatTags(
"env:" + config.getEnv(),
"version:" + config.getVersion(),
"debugger_version:" + DDTraceCoreInfo.VERSION,
"agent_version:" + DebuggerAgent.getAgentVersion(),
"host_name:" + config.getHostName());
if (config.getGlobalTags().isEmpty()) {
return debuggerTags;
}
String globalTags =
config.getGlobalTags().entrySet().stream()
.map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining(","));
return debuggerTags + "," + globalTags;
}

private static String getDiagnosticEndpoint(
Config config, DDAgentFeaturesDiscovery ddAgentFeaturesDiscovery) {
if (ddAgentFeaturesDiscovery.supportsDebuggerDiagnostics()) {
Expand Down Expand Up @@ -245,12 +282,12 @@ public static JsonSnapshotSerializer getSnapshotSerializer() {
private static class ShutdownHook extends Thread {

private final WeakReference<ConfigurationPoller> pollerRef;
private final WeakReference<BatchUploader> uploaderRef;
private final WeakReference<DebuggerSink> sinkRef;

private ShutdownHook(ConfigurationPoller poller, BatchUploader uploader) {
private ShutdownHook(ConfigurationPoller poller, DebuggerSink debuggerSink) {
super(AGENT_THREAD_GROUP, "dd-debugger-shutdown-hook");
pollerRef = new WeakReference<>(poller);
uploaderRef = new WeakReference<>(uploader);
sinkRef = new WeakReference<>(debuggerSink);
}

@Override
Expand All @@ -264,10 +301,10 @@ public void run() {
}
}

final BatchUploader uploader = uploaderRef.get();
if (uploader != null) {
final DebuggerSink sink = sinkRef.get();
if (sink != null) {
try {
uploader.shutdown();
sink.stop();
} catch (Exception ex) {
LOGGER.warn("Failed to shutdown SnapshotUploader", ex);
}
Expand Down Expand Up @@ -296,7 +333,7 @@ public void addReportToFlare(ZipOutputStream zip) throws IOException {
String.join(
System.lineSeparator(),
"Snapshot url: ",
sink.getSnapshotUploader().getUrl().toString(),
sink.getSnapshotSink().getUrl().toString(),
"Diagnostic url: ",
sink.getProbeStatusSink().getUrl().toString(),
"SymbolDB url: ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
import com.datadog.debugger.probe.Where;
import com.datadog.debugger.sink.DebuggerSink;
import com.datadog.debugger.sink.ProbeStatusSink;
import com.datadog.debugger.sink.SnapshotSink;
import com.datadog.debugger.sink.SymbolSink;
import com.datadog.debugger.uploader.BatchUploader;
import com.datadog.debugger.util.ClassFileLines;
import com.datadog.debugger.util.DebuggerMetrics;
import com.datadog.debugger.util.ExceptionHelper;
import datadog.trace.agent.tooling.AgentStrategies;
import datadog.trace.api.Config;
Expand Down Expand Up @@ -117,7 +121,13 @@ public DebuggerTransformer(Config config, Configuration configuration) {
configuration,
null,
new DebuggerSink(
config, new ProbeStatusSink(config, config.getFinalDebuggerSnapshotUrl(), false)));
config,
"",
DebuggerMetrics.getInstance(config),
new ProbeStatusSink(config, config.getFinalDebuggerSnapshotUrl(), false),
new SnapshotSink(
config, "", new BatchUploader(config, config.getFinalDebuggerSnapshotUrl())),
new SymbolSink(config)));
}

private void readExcludeFiles(String commaSeparatedFileNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,10 @@ protected void commitSnapshot(Snapshot snapshot, DebuggerSink sink) {
*/
if (isCaptureSnapshot()) {
snapshot.recordStackTrace(5);
sink.addSnapshot(snapshot);
} else {
sink.addHighRateSnapshot(snapshot);
}
sink.addSnapshot(snapshot);
}

@Override
Expand Down
Loading

0 comments on commit 2975ecb

Please sign in to comment.