Skip to content

Commit

Permalink
Telemetry v2 (#5994)
Browse files Browse the repository at this point in the history
message batching
config origin and default config values
Intake support
extended-heartbeat
  • Loading branch information
ygree committed Oct 31, 2023
1 parent 4ad36f5 commit 6be4cdb
Show file tree
Hide file tree
Showing 54 changed files with 2,980 additions and 1,080 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.continue.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ instrumentation_modules: &instrumentation_modules "dd-java-agent/instrumentation
debugger_modules: &debugger_modules "dd-java-agent/agent-debugger|dd-java-agent/agent-bootstrap|dd-java-agent/agent-builder|internal-api|communication|dd-trace-core"
profiling_modules: &profiling_modules "dd-java-agent/agent-profiling"

default_system_tests_commit: &default_system_tests_commit 97bada5205ff411ec62ba9c2c15e66cd5a49fbb0
default_system_tests_commit: &default_system_tests_commit 2487cea5160a398549743d2cfd927a863792e3bd

parameters:
nightly:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {

public static final String DEBUGGER_ENDPOINT = "debugger/v1/input";

public static final String TELEMETRY_PROXY_ENDPOINT = "telemetry/proxy/";

private static final long MIN_FEATURE_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;

private final OkHttpClient client;
Expand All @@ -58,6 +60,7 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
private final boolean metricsEnabled;
private final String[] dataStreamsEndpoints = {V01_DATASTREAMS_ENDPOINT};
private final String[] evpProxyEndpoints = {V2_EVP_PROXY_ENDPOINT};
private final String[] telemetryProxyEndpoints = {TELEMETRY_PROXY_ENDPOINT};

private volatile String traceEndpoint;
private volatile String metricsEndpoint;
Expand All @@ -69,6 +72,7 @@ public class DDAgentFeaturesDiscovery implements DroppingPolicy {
private volatile String debuggerEndpoint;
private volatile String evpProxyEndpoint;
private volatile String version;
private volatile String telemetryProxyEndpoint;

private long lastTimeDiscovered;

Expand Down Expand Up @@ -100,6 +104,7 @@ private void reset() {
evpProxyEndpoint = null;
version = null;
lastTimeDiscovered = 0;
telemetryProxyEndpoint = null;
}

/** Run feature discovery, unconditionally. */
Expand Down Expand Up @@ -162,14 +167,15 @@ private void doDiscovery() {

if (log.isDebugEnabled()) {
log.debug(
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, evpProxyEndpoint={}",
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, evpProxyEndpoint={}, telemetryProxyEndpoint={}",
traceEndpoint,
metricsEndpoint,
supportsDropping,
supportsLongRunning,
dataStreamsEndpoint,
configEndpoint,
evpProxyEndpoint);
evpProxyEndpoint,
telemetryProxyEndpoint);
}
}

Expand Down Expand Up @@ -247,6 +253,13 @@ private boolean processInfoResponse(String response) {
}
}

for (String endpoint : telemetryProxyEndpoints) {
if (endpoints.contains(endpoint) || endpoints.contains("/" + endpoint)) {
telemetryProxyEndpoint = endpoint;
break;
}
}

supportsLongRunning = Boolean.TRUE.equals(map.getOrDefault("long_running_spans", false));

if (metricsEnabled) {
Expand Down Expand Up @@ -352,4 +365,8 @@ public String state() {
public boolean active() {
return supportsMetrics() && supportsDropping;
}

public boolean supportsTelemetryProxy() {
return telemetryProxyEndpoint != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import spock.lang.Shared

import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.CountDownLatch

import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT
import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V6_METRICS_ENDPOINT
Expand All @@ -38,6 +37,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
static final String INFO_WITHOUT_DATA_STREAMS_RESPONSE = loadJsonFile("agent-info-without-data-streams.json")
static final String INFO_WITHOUT_DATA_STREAMS_STATE = Strings.sha256(INFO_WITHOUT_DATA_STREAMS_RESPONSE)
static final String INFO_WITH_LONG_RUNNING_SPANS = loadJsonFile("agent-info-with-long-running-spans.json")
static final String INFO_WITH_TELEMETRY_PROXY_RESPONSE = loadJsonFile("agent-info-with-telemetry-proxy.json")
static final String PROBE_STATE = "probestate"

def "test parse /info response"() {
Expand All @@ -62,6 +62,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
features.supportsEvpProxy()
features.getVersion() == "0.99.0"
!features.supportsLongRunning()
!features.supportsTelemetryProxy()
0 * _
}

Expand Down Expand Up @@ -89,6 +90,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
features.supportsEvpProxy()
features.getVersion() == "0.99.0"
!features.supportsLongRunning()
!features.supportsTelemetryProxy()
0 * _
}

Expand Down Expand Up @@ -384,17 +386,22 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
// but we don't permit dropping anyway
!(features as DroppingPolicy).active()
features.state() == INFO_WITHOUT_METRICS_STATE
!features.supportsTelemetryProxy()
0 * _
}

def countingNotFound(Request request, CountDownLatch latch) {
latch.countDown()
return notFound(request)
}
def "test parse /info response with telemetry proxy"() {
setup:
OkHttpClient client = Mock(OkHttpClient)
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, true, true)

def countingInfoResponse(Request request, String json, CountDownLatch latch) {
latch.countDown()
return infoResponse(request, json)
when: "/info available"
features.discover()

then:
1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_TELEMETRY_PROXY_RESPONSE) }
features.supportsTelemetryProxy()
0 * _
}

def infoResponse(Request request, String json) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"version": "0.99.0",
"git_commit": "fab047e10",
"build_date": "2020-12-04 15:57:06.74187 +0200 EET m=+0.029001792",
"endpoints": [
"/v0.3/traces",
"/v0.3/services",
"/v0.4/traces",
"/v0.4/services",
"/v0.5/traces",
"/v0.6/stats",
"/profiling/v1/input",
"/telemetry/proxy/",
"/v0.1/pipeline_stats",
"/evp_proxy/v1/",
"/evp_proxy/v2/",
"/debugger/v1/input",
"/v0.7/config"
],
"feature_flags": [
"feature_flag"
],
"config": {
"default_env": "prod",
"bucket_interval": 1000000000,
"extra_aggregators": [
"agg:val"
],
"extra_sample_rate": 2.4,
"target_tps": 11,
"max_eps": 12,
"receiver_port": 8111,
"receiver_socket": "/sock/path",
"connection_limit": 12,
"receiver_timeout": 100,
"max_request_bytes": 123,
"statsd_port": 123,
"max_memory": 1000000,
"max_cpu": 12345,
"analyzed_rate_by_service_legacy": {
"X": 1.2
},
"analyzed_spans_by_service": {
"X": {
"Y": 2.4
}
},
"obfuscation": {
"elastic_search": true,
"mongo": true,
"sql_exec_plan": true,
"sql_exec_plan_normalize": true,
"http": {
"remove_query_string": true,
"remove_path_digits": true
},
"remove_stack_traces": false,
"redis": true,
"memcached": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class UndertowServletTest extends HttpServerTest<Undertow> {

DeploymentManager manager = container.addDeployment(builder)
manager.deploy()
System.out.println(">>> builder.getContextPath(): " + builder.getContextPath())
root.addPrefixPath(builder.getContextPath(), manager.start())

undertowServer = Undertow.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CustomLogManagerTest extends Specification {
"-Djava.util.logging.manager=jvmbootstraptest.MissingLogManager"
] as String[]
, "" as String[]
, ["DD_API_KEY": API_KEY]
, ["DD_API_KEY": API_KEY, "DD_SITE": ""]
, true) == 0
}

Expand Down Expand Up @@ -87,7 +87,7 @@ class CustomLogManagerTest extends Specification {
"-Ddd.app.customjmxbuilder=false"
] as String[]
, "" as String[]
, ["JBOSS_HOME": "/", "DD_API_KEY": API_KEY]
, ["JBOSS_HOME": "/", "DD_API_KEY": API_KEY, "DD_SITE": ""]
, true) == 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static datadog.trace.api.config.TraceInstrumentationConfig.LOGS_INJECTION_ENABLED;

import datadog.trace.api.ConfigCollector;
import datadog.trace.api.ConfigSetting;
import datadog.trace.api.CorrelationIdentifier;
import datadog.trace.api.Trace;
import java.util.concurrent.TimeUnit;
Expand All @@ -22,15 +23,13 @@ public void run() throws InterruptedException {

secondTracedMethod();

if (!waitForCondition(
() -> Boolean.FALSE.equals(ConfigCollector.get().collect().get(LOGS_INJECTION_ENABLED)))) {
if (!waitForCondition(() -> Boolean.FALSE.equals(getLogInjectionEnabled()))) {
throw new RuntimeException("Logs injection config was never updated");
}

thirdTracedMethod();

if (!waitForCondition(
() -> Boolean.TRUE.equals(ConfigCollector.get().collect().get(LOGS_INJECTION_ENABLED)))) {
if (!waitForCondition(() -> Boolean.TRUE.equals(getLogInjectionEnabled()))) {
throw new RuntimeException("Logs injection config was never updated a second time");
}

Expand All @@ -44,6 +43,14 @@ public void run() throws InterruptedException {
Thread.sleep(400);
}

private static Object getLogInjectionEnabled() {
ConfigSetting configSetting = ConfigCollector.get().collect().get(LOGS_INJECTION_ENABLED);
if (configSetting == null) {
return null;
}
return configSetting.value;
}

@Trace
public void firstTracedMethod() {
doLog("INSIDE FIRST SPAN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ abstract class AbstractSmokeTest extends ProcessManager {
"-Ddd.profiling.ddprof.enabled=true",
"-Ddd.profiling.ddprof.alloc.enabled=true",
"-Ddatadog.slf4j.simpleLogger.defaultLogLevel=${logLevel()}",
"-Dorg.slf4j.simpleLogger.defaultLogLevel=${logLevel()}"
"-Dorg.slf4j.simpleLogger.defaultLogLevel=${logLevel()}",
"-Ddd.site="
]

@Shared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ public final class ConfigDefaults {

static final boolean DEFAULT_TELEMETRY_ENABLED = true;
static final int DEFAULT_TELEMETRY_HEARTBEAT_INTERVAL = 60; // in seconds
static final int DEFAULT_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL =
24 * 60 * 60; // 24 hours in seconds
static final int DEFAULT_TELEMETRY_METRICS_INTERVAL = 10; // in seconds
static final boolean DEFAULT_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED = true;

Expand All @@ -197,8 +199,8 @@ public final class ConfigDefaults {
static final boolean DEFAULT_ELASTICSEARCH_BODY_AND_PARAMS_ENABLED = false;

static final boolean DEFAULT_SPARK_TASK_HISTOGRAM_ENABLED = true;

static final boolean DEFAULT_JAX_RS_EXCEPTION_AS_ERROR_ENABLED = true;
static final boolean DEFAULT_TELEMETRY_DEBUG_REQUESTS_ENABLED = false;

private ConfigDefaults() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ public final class GeneralConfig {

public static final String TELEMETRY_ENABLED = "instrumentation.telemetry.enabled";
public static final String TELEMETRY_HEARTBEAT_INTERVAL = "telemetry.heartbeat.interval";
public static final String TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL =
"telemetry.extended.heartbeat.interval";
public static final String TELEMETRY_METRICS_INTERVAL = "telemetry.metrics.interval";
public static final String TELEMETRY_METRICS_ENABLED = "telemetry.metrics.enabled";
public static final String TELEMETRY_DEPENDENCY_COLLECTION_ENABLED =
"telemetry.dependency-collection.enabled";

public static final String TELEMETRY_DEBUG_REQUESTS_ENABLED = "telemetry.debug.requests.enabled";

private GeneralConfig() {}
}
Loading

0 comments on commit 6be4cdb

Please sign in to comment.