Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
427 changes: 299 additions & 128 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use tracing_subscriber::EnvFilter;
use zstd::zstd_safe::CompressionLevel;

use datadog_trace_agent::{
aggregator::TraceAggregator,
trace_aggregator::TraceAggregator,
config, env_verifier, mini_agent, stats_flusher, stats_processor,
trace_flusher::{self, TraceFlusher},
trace_processor,
proxy_aggregator,
proxy_flusher,
};

use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType};
Expand Down Expand Up @@ -120,13 +122,20 @@ pub async fn main() {
Arc::clone(&config),
));

let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::ProxyAggregator::default()));
let proxy_flusher = Arc::new(proxy_flusher::ProxyFlusher::new(
proxy_aggregator,
Arc::clone(&config),
));

let mini_agent = Box::new(mini_agent::MiniAgent {
config: Arc::clone(&config),
env_verifier,
trace_processor,
trace_flusher,
stats_processor,
stats_flusher,
proxy_flusher,
});

tokio::spawn(async move {
Expand Down
3 changes: 3 additions & 0 deletions crates/datadog-trace-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev =
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0", features = ["mini_agent"] }
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
datadog-fips = { path = "../datadog-fips", default-features = false }
reqwest = { version = "0.12.23", features = ["json"] }
bytes = "1.10.1"

[dev-dependencies]
rmp-serde = "1.1.1"
Expand Down
12 changes: 11 additions & 1 deletion crates/datadog-trace-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::OnceLock;
use datadog_trace_obfuscation::obfuscation_config;
use datadog_trace_utils::config_utils::{
read_cloud_env, trace_intake_url, trace_intake_url_prefixed, trace_stats_url,
trace_stats_url_prefixed,
trace_stats_url_prefixed
};
use datadog_trace_utils::trace_utils;

Expand Down Expand Up @@ -86,6 +86,9 @@ pub struct Config {
pub trace_flush_interval: u64,
pub trace_intake: Endpoint,
pub trace_stats_intake: Endpoint,
/// how often to flush proxy requests, in seconds
pub proxy_flush_interval: u64,
pub proxy_intake: Endpoint,
/// timeout for environment verification, in milliseconds
pub verify_env_timeout: u64,
pub proxy_url: Option<String>,
Expand All @@ -111,6 +114,7 @@ impl Config {
// trace stats to)
let mut trace_intake_url = trace_intake_url(&dd_site);
let mut trace_stats_intake_url = trace_stats_url(&dd_site);
let proxy_intake_url = format!("https://intake.profile.{}/api/v2/profile", dd_site);

// DD_APM_DD_URL env var will primarily be used for integration tests
// overrides the entire trace/trace stats intake url prefix
Expand Down Expand Up @@ -139,6 +143,7 @@ impl Config {
max_request_content_length: 10 * 1024 * 1024, // 10MB in Bytes
trace_flush_interval: 3,
stats_flush_interval: 3,
proxy_flush_interval: 3,
verify_env_timeout: 100,
dd_dogstatsd_port,
dd_site,
Expand All @@ -149,6 +154,11 @@ impl Config {
},
trace_stats_intake: Endpoint {
url: hyper::Uri::from_str(&trace_stats_intake_url).unwrap(),
api_key: Some(api_key.clone()),
..Default::default()
},
proxy_intake: Endpoint {
url: hyper::Uri::from_str(&proxy_intake_url).unwrap(),
api_key: Some(api_key),
..Default::default()
},
Expand Down
16 changes: 16 additions & 0 deletions crates/datadog-trace-agent/src/http_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use hyper::{
};
use serde_json::json;
use tracing::{debug, error};
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
use core::time::Duration;
use std::error::Error;

/// Does two things:
/// 1. Logs the given message. A success status code (within 200-299) will cause an info log to be
Expand Down Expand Up @@ -111,6 +114,19 @@ pub fn verify_request_content_length(
None
}

/// Builds a reqwest client with optional proxy configuration and timeout.
/// Uses FIPS-compliant TLS when the fips feature is enabled.
pub fn build_client(
proxy_url: Option<&str>,
timeout: Duration,
) -> Result<reqwest::Client, Box<dyn Error>> {
let mut builder = create_reqwest_client_builder()?.timeout(timeout);
if let Some(proxy) = proxy_url {
builder = builder.proxy(reqwest::Proxy::all(proxy)?);
}
Ok(builder.build()?)
}

#[cfg(test)]
mod tests {
use ddcommon::hyper_migration;
Expand Down
4 changes: 3 additions & 1 deletion crates/datadog-trace-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]

pub mod aggregator;
pub mod trace_aggregator;
pub mod config;
pub mod env_verifier;
pub mod http_utils;
pub mod mini_agent;
pub mod proxy_aggregator;
pub mod proxy_flusher;
pub mod stats_flusher;
pub mod stats_processor;
pub mod trace_flusher;
Expand Down
84 changes: 81 additions & 3 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use ddcommon::hyper_migration;
use http_body_util::BodyExt;
use hyper::service::service_fn;
use hyper::{http, Method, Response, StatusCode};
use serde_json::json;
Expand All @@ -12,8 +13,8 @@ use std::time::Instant;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tracing::{debug, error};

use crate::http_utils::log_and_create_http_response;
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor};
use crate::http_utils::{self, log_and_create_http_response};
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor, proxy_aggregator::ProxyRequest, proxy_flusher};
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils;
use datadog_trace_utils::trace_utils::SendData;
Expand All @@ -22,8 +23,10 @@ const MINI_AGENT_PORT: usize = 8126;
const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
const INFO_ENDPOINT_PATH: &str = "/info";
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
const PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;

pub struct MiniAgent {
pub config: Arc<config::Config>,
Expand All @@ -32,6 +35,7 @@ pub struct MiniAgent {
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>,
pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>,
pub proxy_flusher: Arc<proxy_flusher::ProxyFlusher>,
}

impl MiniAgent {
Expand Down Expand Up @@ -83,6 +87,18 @@ impl MiniAgent {
.start_stats_flusher(stats_config, stats_rx)
.await;
});
// channels to send processed profiling requests to our proxy flusher.
let (proxy_tx, proxy_rx): (
Sender<ProxyRequest>,
Receiver<ProxyRequest>,
) = mpsc::channel(PROXY_PAYLOAD_CHANNEL_BUFFER_SIZE);

// start our proxy flusher for profiling requests
let proxy_flusher = self.proxy_flusher.clone();
tokio::spawn(async move {
let proxy_flusher = proxy_flusher.clone();
proxy_flusher.start_proxy_flusher(proxy_rx).await;
});

// setup our hyper http server, where the endpoint_handler handles incoming requests
let trace_processor = self.trace_processor.clone();
Expand All @@ -99,6 +115,8 @@ impl MiniAgent {
let endpoint_config = endpoint_config.clone();
let mini_agent_metadata = Arc::clone(&mini_agent_metadata);

let proxy_tx = proxy_tx.clone();

MiniAgent::trace_endpoint_handler(
endpoint_config.clone(),
req.map(hyper_migration::Body::incoming),
Expand All @@ -107,6 +125,7 @@ impl MiniAgent {
stats_processor.clone(),
stats_tx.clone(),
Arc::clone(&mini_agent_metadata),
proxy_tx.clone(),
)
});

Expand Down Expand Up @@ -170,6 +189,7 @@ impl MiniAgent {
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
stats_tx: Sender<pb::ClientStatsPayload>,
mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>,
proxy_tx: Sender<ProxyRequest>,
) -> http::Result<hyper_migration::HttpResponse> {
match (req.method(), req.uri().path()) {
(&Method::PUT | &Method::POST, TRACE_ENDPOINT_PATH) => {
Expand All @@ -193,6 +213,15 @@ impl MiniAgent {
),
}
}
(&Method::POST, PROFILING_ENDPOINT_PATH) => {
match Self::profiling_proxy_handler(config, req, proxy_tx).await {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
&format!("Error processing profiling request: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) {
Ok(res) => Ok(res),
Err(err) => log_and_create_http_response(
Expand All @@ -208,13 +237,61 @@ impl MiniAgent {
}
}

async fn profiling_proxy_handler(
config: Arc<config::Config>,
request: hyper_migration::HttpRequest,
proxy_tx: Sender<ProxyRequest>,
) -> http::Result<hyper_migration::HttpResponse> {
debug!("Trace Agent | Proxied request for profiling");

// Extract headers and body
let (parts, body) = request.into_parts();
if let Some(response) = http_utils::verify_request_content_length(
&parts.headers,
config.max_request_content_length,
"Error processing profiling request",
) {
return response;
}

let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
return log_and_create_http_response(
&format!("Error reading profiling request body: {e}"),
StatusCode::BAD_REQUEST,
);
}
};

// Create proxy request
let proxy_request = ProxyRequest {
headers: parts.headers,
body: body_bytes,
target_url: config.proxy_intake.url.to_string(),
};

// Send to channel - flusher will aggregate and send
match proxy_tx.send(proxy_request).await {
Ok(_) => log_and_create_http_response(
"Successfully buffered profiling request to be flushed",
StatusCode::OK,
),
Err(err) => log_and_create_http_response(
&format!("Error sending profiling request to the proxy flusher: {err}"),
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}

fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> {
let response_json = json!(
{
"endpoints": [
TRACE_ENDPOINT_PATH,
STATS_ENDPOINT_PATH,
INFO_ENDPOINT_PATH
INFO_ENDPOINT_PATH,
PROFILING_ENDPOINT_PATH
],
"client_drop_p0s": true,
"config": {
Expand All @@ -226,4 +303,5 @@ impl MiniAgent {
.status(200)
.body(hyper_migration::Body::from(response_json.to_string()))
}

}
39 changes: 39 additions & 0 deletions crates/datadog-trace-agent/src/proxy_aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use bytes::Bytes;
use reqwest::header::HeaderMap;

#[derive(Clone)]
pub struct ProxyRequest {
pub headers: HeaderMap,
pub body: Bytes,
pub target_url: String,
}

/// Takes in individual proxy requests and aggregates them into batches to be flushed to Datadog.
pub struct ProxyAggregator {
queue: Vec<ProxyRequest>,
}

impl Default for ProxyAggregator {
fn default() -> Self {
ProxyAggregator {
queue: Vec::with_capacity(128), // arbitrary capacity for request queue
}
}
}

impl ProxyAggregator {
/// Takes in an individual proxy request.
pub fn add(&mut self, request: ProxyRequest) {
self.queue.push(request);
}

/// Returns a batch of proxy requests.
pub fn get_batch(&mut self) -> Vec<ProxyRequest> {
std::mem::take(&mut self.queue)
}

/// Flush the queue.
pub fn clear(&mut self) {
self.queue.clear();
}
}
Loading