Skip to content

Commit c0abc20

Browse files
committed
Handle proxy payload
1 parent b0b0cb9 commit c0abc20

File tree

4 files changed

+107
-4
lines changed

4 files changed

+107
-4
lines changed

crates/datadog-trace-agent/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog/", rev =
2323
datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0", features = ["mini_agent"] }
2424
datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
2525
datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog/", rev = "4eb2b8673354f974591c61bab3f7d485b4c119e0" }
26+
reqwest = { version = "0.12.23", features = ["json"] }
27+
bytes = "1.10.1"
2628

2729
[dev-dependencies]
2830
rmp-serde = "1.1.1"

crates/datadog-trace-agent/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod config;
1212
pub mod env_verifier;
1313
pub mod http_utils;
1414
pub mod mini_agent;
15+
pub mod proxy_aggregator;
1516
pub mod stats_flusher;
1617
pub mod stats_processor;
1718
pub mod trace_flusher;

crates/datadog-trace-agent/src/mini_agent.rs

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use ddcommon::hyper_migration;
5+
use http_body_util::BodyExt;
56
use hyper::service::service_fn;
67
use hyper::{http, Method, Response, StatusCode};
78
use serde_json::json;
89
use std::io;
910
use std::net::SocketAddr;
1011
use std::sync::Arc;
1112
use std::time::Instant;
12-
use tokio::sync::mpsc::{self, Receiver, Sender};
13+
use tokio::sync::{
14+
Mutex,
15+
mpsc::{self, Receiver, Sender},
16+
};
1317
use tracing::{debug, error};
1418

15-
use crate::http_utils::log_and_create_http_response;
16-
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor};
19+
use crate::http_utils::{self, log_and_create_http_response};
20+
use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor, proxy_aggregator::{self, ProxyRequest}};
1721
use datadog_trace_protobuf::pb;
1822
use datadog_trace_utils::trace_utils;
1923
use datadog_trace_utils::trace_utils::SendData;
@@ -22,6 +26,7 @@ const MINI_AGENT_PORT: usize = 8126;
2226
const TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
2327
const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
2428
const INFO_ENDPOINT_PATH: &str = "/info";
29+
const PROFILING_ENDPOINT_PATH: &str = "/profiling/v1/input";
2530
const TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
2631
const STATS_PAYLOAD_CHANNEL_BUFFER_SIZE: usize = 10;
2732

@@ -32,6 +37,7 @@ pub struct MiniAgent {
3237
pub stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
3338
pub stats_flusher: Arc<dyn stats_flusher::StatsFlusher + Send + Sync>,
3439
pub env_verifier: Arc<dyn env_verifier::EnvVerifier + Send + Sync>,
40+
pub proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
3541
}
3642

3743
impl MiniAgent {
@@ -84,10 +90,17 @@ impl MiniAgent {
8490
.await;
8591
});
8692

93+
// start our proxy flusher for profiling requests
94+
let proxy_aggregator_for_flusher = self.proxy_aggregator.clone();
95+
tokio::spawn(async move {
96+
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
97+
});
98+
8799
// setup our hyper http server, where the endpoint_handler handles incoming requests
88100
let trace_processor = self.trace_processor.clone();
89101
let stats_processor = self.stats_processor.clone();
90102
let endpoint_config = self.config.clone();
103+
let proxy_aggregator = self.proxy_aggregator.clone();
91104

92105
let service = service_fn(move |req| {
93106
let trace_processor = trace_processor.clone();
@@ -98,6 +111,7 @@ impl MiniAgent {
98111

99112
let endpoint_config = endpoint_config.clone();
100113
let mini_agent_metadata = Arc::clone(&mini_agent_metadata);
114+
let proxy_aggregator = proxy_aggregator.clone();
101115

102116
MiniAgent::trace_endpoint_handler(
103117
endpoint_config.clone(),
@@ -107,6 +121,7 @@ impl MiniAgent {
107121
stats_processor.clone(),
108122
stats_tx.clone(),
109123
Arc::clone(&mini_agent_metadata),
124+
proxy_aggregator.clone(),
110125
)
111126
});
112127

@@ -170,6 +185,7 @@ impl MiniAgent {
170185
stats_processor: Arc<dyn stats_processor::StatsProcessor + Send + Sync>,
171186
stats_tx: Sender<pb::ClientStatsPayload>,
172187
mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>,
188+
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
173189
) -> http::Result<hyper_migration::HttpResponse> {
174190
match (req.method(), req.uri().path()) {
175191
(&Method::PUT | &Method::POST, TRACE_ENDPOINT_PATH) => {
@@ -193,6 +209,15 @@ impl MiniAgent {
193209
),
194210
}
195211
}
212+
(&Method::POST, PROFILING_ENDPOINT_PATH) => {
213+
match Self::profiling_proxy_handler(config, req, proxy_aggregator).await {
214+
Ok(res) => Ok(res),
215+
Err(err) => log_and_create_http_response(
216+
&format!("Error processing profiling request: {err}"),
217+
StatusCode::INTERNAL_SERVER_ERROR,
218+
),
219+
}
220+
}
196221
(_, INFO_ENDPOINT_PATH) => match Self::info_handler(config.dd_dogstatsd_port) {
197222
Ok(res) => Ok(res),
198223
Err(err) => log_and_create_http_response(
@@ -208,13 +233,49 @@ impl MiniAgent {
208233
}
209234
}
210235

236+
async fn profiling_proxy_handler(
237+
config: Arc<config::Config>,
238+
request: hyper_migration::HttpRequest,
239+
proxy_aggregator: Arc<Mutex<proxy_aggregator::Aggregator>>,
240+
) -> Result<hyper_migration::HttpResponse, Box<dyn std::error::Error + Send + Sync>> {
241+
debug!("Trace Agent | Proxied request for profiling");
242+
243+
// Extract headers and body
244+
let (parts, body) = request.into_parts();
245+
if let Some(response) = http_utils::verify_request_content_length(
246+
&parts.headers,
247+
config.max_request_content_length,
248+
"Error processing profiling request",
249+
) {
250+
return response.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
251+
}
252+
253+
let body_bytes = body.collect().await?.to_bytes();
254+
255+
// Create proxy request
256+
let proxy_request = ProxyRequest {
257+
headers: parts.headers,
258+
body: body_bytes,
259+
target_url: format!("https://intake.profile.{}/api/v2/profile", config.dd_site),
260+
};
261+
262+
let mut proxy_aggregator = proxy_aggregator.lock().await;
263+
proxy_aggregator.add(proxy_request);
264+
265+
Response::builder()
266+
.status(200)
267+
.body(hyper_migration::Body::from("Acknowledged profiling request"))
268+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
269+
}
270+
211271
fn info_handler(dd_dogstatsd_port: u16) -> http::Result<hyper_migration::HttpResponse> {
212272
let response_json = json!(
213273
{
214274
"endpoints": [
215275
TRACE_ENDPOINT_PATH,
216276
STATS_ENDPOINT_PATH,
217-
INFO_ENDPOINT_PATH
277+
INFO_ENDPOINT_PATH,
278+
PROFILING_ENDPOINT_PATH
218279
],
219280
"client_drop_p0s": true,
220281
"config": {
@@ -226,4 +287,5 @@ impl MiniAgent {
226287
.status(200)
227288
.body(hyper_migration::Body::from(response_json.to_string()))
228289
}
290+
229291
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use bytes::Bytes; // TODO: Do we use bytes?
2+
use reqwest::header::HeaderMap; // TODO: Do we use reqwest?
3+
4+
pub struct ProxyRequest {
5+
pub headers: HeaderMap,
6+
pub body: Bytes,
7+
pub target_url: String,
8+
}
9+
10+
/// Takes in individual proxy requests and aggregates them into batches to be flushed to Datadog.
11+
pub struct Aggregator {
12+
queue: Vec<ProxyRequest>,
13+
}
14+
15+
impl Default for Aggregator {
16+
fn default() -> Self {
17+
Aggregator {
18+
queue: Vec::with_capacity(128), // arbitrary capacity for request queue
19+
}
20+
}
21+
}
22+
23+
impl Aggregator {
24+
/// Takes in an individual proxy request.
25+
pub fn add(&mut self, request: ProxyRequest) {
26+
self.queue.push(request);
27+
}
28+
29+
/// Returns a batch of proxy requests.
30+
pub fn get_batch(&mut self) -> Vec<ProxyRequest> {
31+
std::mem::take(&mut self.queue)
32+
}
33+
34+
/// Flush the queue.
35+
pub fn clear(&mut self) {
36+
self.queue.clear();
37+
}
38+
}

0 commit comments

Comments
 (0)