Skip to content

Commit b1583da

Browse files
astuyveCopilot
andauthored
Optionally redrive failed requests (#19)
* feat: retry/repush to support continuous background flushing * feat: optionally retry redrive failed traces * Update crates/datadog-trace-agent/src/trace_flusher.rs Co-authored-by: Copilot <[email protected]> * Revert "Update crates/datadog-trace-agent/src/trace_flusher.rs" This reverts commit e943bd1. --------- Co-authored-by: Copilot <[email protected]>
1 parent e37f0a1 commit b1583da

File tree

3 files changed

+134
-28
lines changed

3 files changed

+134
-28
lines changed

crates/datadog-trace-agent/src/trace_flusher.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ pub trait TraceFlusher {
2121
/// implementing flushing logic that calls flush_traces.
2222
async fn start_trace_flusher(&self, mut rx: Receiver<SendData>);
2323
/// Given a `Vec<SendData>`, a tracer payload, send it to the Datadog intake endpoint.
24-
async fn send(&self, traces: Vec<SendData>);
24+
/// Returns the traces back if there was an error sending them.
25+
async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>>;
2526

2627
/// Flushes traces by getting every available batch on the aggregator.
27-
async fn flush(&self);
28+
/// If `failed_traces` is provided, it will attempt to send those instead of fetching new traces.
29+
/// Returns any traces that failed to send and should be retried.
30+
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>>;
2831
}
2932

3033
#[derive(Clone)]
@@ -51,39 +54,66 @@ impl TraceFlusher for ServerlessTraceFlusher {
5154

5255
loop {
5356
tokio::time::sleep(time::Duration::from_secs(self.config.trace_flush_interval)).await;
54-
self.flush().await;
57+
self.flush(None).await;
5558
}
5659
}
5760

58-
async fn flush(&self) {
59-
let mut guard = self.aggregator.lock().await;
61+
async fn flush(&self, failed_traces: Option<Vec<SendData>>) -> Option<Vec<SendData>> {
62+
let mut failed_batch: Option<Vec<SendData>> = None;
63+
64+
if let Some(traces) = failed_traces {
65+
// If we have traces from a previous failed attempt, try to send those first
66+
if !traces.is_empty() {
67+
debug!("Retrying to send {} previously failed traces", traces.len());
68+
let retry_result = self.send(traces).await;
69+
if retry_result.is_some() {
70+
// Still failed, return to retry later
71+
return retry_result;
72+
}
73+
}
74+
}
6075

76+
// Process new traces from the aggregator
77+
let mut guard = self.aggregator.lock().await;
6178
let mut traces = guard.get_batch();
79+
6280
while !traces.is_empty() {
63-
self.send(traces).await;
81+
if let Some(failed) = self.send(traces).await {
82+
// Keep track of the failed batch
83+
failed_batch = Some(failed);
84+
// Stop processing more batches if we have a failure
85+
break;
86+
}
6487

6588
traces = guard.get_batch();
6689
}
90+
91+
failed_batch
6792
}
6893

69-
async fn send(&self, traces: Vec<SendData>) {
94+
async fn send(&self, traces: Vec<SendData>) -> Option<Vec<SendData>> {
7095
if traces.is_empty() {
71-
return;
96+
return None;
7297
}
7398
debug!("Flushing {} traces", traces.len());
7499

75-
for traces in trace_utils::coalesce_send_data(traces) {
76-
match traces
100+
// Since we return the original traces on error, we need to clone them before coalescing
101+
let traces_clone = traces.clone();
102+
103+
for coalesced_traces in trace_utils::coalesce_send_data(traces) {
104+
match coalesced_traces
77105
.send_proxy(self.config.proxy_url.as_deref())
78106
.await
79107
.last_result
80108
{
81109
Ok(_) => debug!("Successfully flushed traces"),
82110
Err(e) => {
83-
error!("Error sending trace: {e:?}")
84-
// TODO: Retries
111+
error!("Error sending trace: {e:?}");
112+
// Return the original traces for retry
113+
return Some(traces_clone);
85114
}
86115
}
87116
}
117+
None
88118
}
89119
}

crates/dogstatsd/src/datadog.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ pub(crate) struct Point {
304304
pub(crate) value: f64,
305305
}
306306

307-
#[derive(Debug, Serialize)]
307+
#[derive(Debug, Serialize, Clone)]
308308
/// A named resource
309309
pub(crate) struct Resource {
310310
/// The name of this resource
@@ -335,7 +335,7 @@ impl Serialize for DdMetricKind {
335335
}
336336
}
337337

338-
#[derive(Debug, Serialize)]
338+
#[derive(Debug, Serialize, Clone)]
339339
#[allow(clippy::struct_field_names)]
340340
/// A named collection of `Point` instances.
341341
pub(crate) struct Metric {
@@ -351,7 +351,7 @@ pub(crate) struct Metric {
351351
pub(crate) tags: Vec<String>,
352352
}
353353

354-
#[derive(Debug, Serialize)]
354+
#[derive(Debug, Serialize, Clone)]
355355
/// A collection of metrics as defined by the Datadog Metrics API.
356356
// NOTE we have a number of `Vec` instances in this implementation that could
357357
// otherwise be arrays, given that we have constants. Serializing to JSON would

crates/dogstatsd/src/flusher.rs

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,32 @@ impl Flusher {
3838
}
3939
}
4040

41-
pub async fn flush(&mut self) {
42-
let (all_series, all_distributions) = {
41+
pub async fn flush(
42+
&mut self,
43+
) -> Option<(
44+
Vec<crate::datadog::Series>,
45+
Vec<datadog_protos::metrics::SketchPayload>,
46+
)> {
47+
self.flush_with_retries(None, None).await
48+
}
49+
50+
pub async fn flush_with_retries(
51+
&mut self,
52+
retry_series: Option<Vec<crate::datadog::Series>>,
53+
retry_sketches: Option<Vec<datadog_protos::metrics::SketchPayload>>,
54+
) -> Option<(
55+
Vec<crate::datadog::Series>,
56+
Vec<datadog_protos::metrics::SketchPayload>,
57+
)> {
58+
let (all_series, all_distributions) = if retry_series.is_some() || retry_sketches.is_some()
59+
{
60+
// Use the provided metrics for retry
61+
(
62+
retry_series.unwrap_or_default(),
63+
retry_sketches.unwrap_or_default(),
64+
)
65+
} else {
66+
// Collect new metrics from the aggregator
4367
#[allow(clippy::expect_used)]
4468
let mut aggregator = self.aggregator.lock().expect("lock poisoned");
4569
(
@@ -53,35 +77,68 @@ impl Flusher {
5377

5478
debug!("Flushing {n_series} series and {n_distributions} distributions");
5579

80+
// Save copies for potential error returns
81+
let all_series_copy = all_series.clone();
82+
let all_distributions_copy = all_distributions.clone();
83+
5684
let dd_api_clone = self.dd_api.clone();
5785
let series_handle = tokio::spawn(async move {
86+
let mut failed = Vec::new();
87+
let mut had_shipping_error = false;
5888
for a_batch in all_series {
59-
let continue_shipping =
89+
let (continue_shipping, should_retry) =
6090
should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await;
91+
if should_retry {
92+
failed.push(a_batch);
93+
had_shipping_error = true;
94+
}
6195
if !continue_shipping {
6296
break;
6397
}
6498
}
99+
(failed, had_shipping_error)
65100
});
101+
66102
let dd_api_clone = self.dd_api.clone();
67103
let distributions_handle = tokio::spawn(async move {
104+
let mut failed = Vec::new();
105+
let mut had_shipping_error = false;
68106
for a_batch in all_distributions {
69-
let continue_shipping =
107+
let (continue_shipping, should_retry) =
70108
should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await;
109+
if should_retry {
110+
failed.push(a_batch);
111+
had_shipping_error = true;
112+
}
71113
if !continue_shipping {
72114
break;
73115
}
74116
}
117+
(failed, had_shipping_error)
75118
});
76119

77120
match tokio::try_join!(series_handle, distributions_handle) {
78-
Ok(_) => {
79-
debug!("Successfully flushed {n_series} series and {n_distributions} distributions")
121+
Ok(((series_failed, series_had_error), (sketches_failed, sketches_had_error))) => {
122+
if series_failed.is_empty() && sketches_failed.is_empty() {
123+
debug!("Successfully flushed {n_series} series and {n_distributions} distributions");
124+
None // Return None to indicate success
125+
} else if series_had_error || sketches_had_error {
126+
// Only return the metrics if there was an actual shipping error
127+
error!("Failed to flush some metrics due to shipping errors: {} series and {} sketches",
128+
series_failed.len(), sketches_failed.len());
129+
// Return the failed metrics for potential retry
130+
Some((series_failed, sketches_failed))
131+
} else {
132+
debug!("Some metrics were not sent but no errors occurred");
133+
None // No shipping errors, so don't return metrics for retry
134+
}
80135
}
81136
Err(err) => {
82-
error!("Failed to flush metrics{err}")
137+
error!("Failed to flush metrics: {err}");
138+
// Return all metrics in case of join error for potential retry
139+
Some((all_series_copy, all_distributions_copy))
83140
}
84-
};
141+
}
85142
}
86143
}
87144

@@ -90,26 +147,45 @@ pub enum ShippingError {
90147
Destination(Option<StatusCode>, String),
91148
}
92149

93-
async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> bool {
150+
/// Returns a tuple (continue_to_next_batch, should_retry_this_batch)
151+
async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> (bool, bool) {
94152
match resp {
95153
Ok(resp_payload) => match resp_payload.status() {
96-
StatusCode::ACCEPTED => true,
154+
StatusCode::ACCEPTED => (true, false), // Success, continue to next batch, no need to retry
97155
unexpected_status_code => {
156+
// Check if the status code indicates a permanent error (4xx) or a temporary error (5xx)
157+
let is_permanent_error =
158+
unexpected_status_code.as_u16() >= 400 && unexpected_status_code.as_u16() < 500;
159+
98160
error!(
99161
"{}: Failed to push to API: {:?}",
100162
unexpected_status_code,
101163
resp_payload.text().await.unwrap_or_default()
102164
);
103-
true
165+
166+
if is_permanent_error {
167+
(true, false) // Permanent error, continue to next batch but don't retry
168+
} else {
169+
(false, true) // Temporary error, don't continue to next batch and mark for retry
170+
}
104171
}
105172
},
106173
Err(ShippingError::Payload(msg)) => {
107174
error!("Failed to prepare payload. Data dropped: {}", msg);
108-
true
175+
(true, false) // Payload error, continue to next batch but don't retry (data is malformed)
109176
}
110177
Err(ShippingError::Destination(sc, msg)) => {
178+
// Check if status code indicates a permanent error
179+
let is_permanent_error =
180+
sc.map_or(false, |code| code.as_u16() >= 400 && code.as_u16() < 500);
181+
111182
error!("Error shipping data: {:?} {}", sc, msg);
112-
false
183+
184+
if is_permanent_error {
185+
(false, false) // Permanent destination error, don't continue and don't retry
186+
} else {
187+
(false, true) // Temporary error, don't continue and mark for retry
188+
}
113189
}
114190
}
115191
}

0 commit comments

Comments
 (0)