Skip to content

Commit bb54448

Browse files
gousluCopilot
andcommitted
engine(runtime): broadcast Shutdown to extensions on error path with bounded drain
Address review feedback on PR #2860: - runtime_pipeline.rs: refactor the run loop into an inner async block whose Result is captured, so cleanup runs on every exit path ("async finally"). Previously, on any error the loop did `return Err(e)` immediately and dropped already-started extensions without sending Shutdown — extensions owning sockets, files, or background work missed their cleanup signal. Now the outer block always calls broadcast_shutdown() (idempotent on the normal path) and drain_until_deadline().await before propagating the loop's result. Behavior on the normal path is unchanged. Refs: #2860 (comment) - extension_lifecycle.rs: add drain_until_deadline() that waits for remaining active+background extension tasks to finish but never past the broadcast deadline (plus a small slack to absorb the ControlChannel adapter's deferred Shutdown delivery and the task-return / JoinHandle-observed latency). A misbehaving extension that ignores Shutdown can no longer hang the pipeline indefinitely — it is dropped after EXTENSION_SHUTDOWN_GRACE + EXTENSION_SHUTDOWN_DRAIN_SLACK with a warning. broadcast_shutdown now also stashes the deadline so the drain stays consistent with what the wire-level message advertised. Refs: #2860 (comment) - extension_e2e.rs: regression test test_other_extensions_receive_shutdown_when_pipeline_errors — configures one failing extension and one shutdown-recording extension (each bound by its own probe receiver to survive defined-but-unbound pruning). When the failing extension's start() errors, the pipeline aborts but the recording extension must still observe Shutdown before run_forever returns. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent aa8d0f8 commit bb54448

3 files changed

Lines changed: 251 additions & 41 deletions

File tree

rust/otap-dataflow/crates/engine/src/extension_lifecycle.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,28 @@ use otap_df_telemetry::otel_warn;
3131
use otap_df_telemetry::reporter::MetricsReporter;
3232
use std::time::{Duration, Instant};
3333
use tokio::task::{JoinError, JoinHandle, LocalSet};
34+
use tokio::time::Instant as TokioInstant;
3435

3536
/// Cleanup window granted to extensions after the data path has
3637
/// drained. Extensions that don't terminate within this window will
3738
/// be left to the runtime's natural drop semantics when
3839
/// `run_forever` returns.
3940
pub(crate) const EXTENSION_SHUTDOWN_GRACE: Duration = Duration::from_secs(5);
4041

42+
/// Slack added past `EXTENSION_SHUTDOWN_GRACE` before the runtime
43+
/// hard-stops draining extension tasks. The cooperative deadline that
44+
/// `broadcast_shutdown` puts on the wire and the deadline the runtime
45+
/// waits on are intentionally different: extensions get exactly
46+
/// `EXTENSION_SHUTDOWN_GRACE` to wind down (matching the contract in
47+
/// the `Shutdown` message), and the runtime waits a small additional
48+
/// window to absorb the time it takes the
49+
/// [`crate::extension::ControlChannel`] adapter to deliver the
50+
/// `Shutdown` message through to the extension's `recv()` and for the
51+
/// extension's task to return. Without this slack a well-behaved
52+
/// extension that wakes up exactly at the deadline races against the
53+
/// runtime's drain timeout.
54+
pub(crate) const EXTENSION_SHUTDOWN_DRAIN_SLACK: Duration = Duration::from_millis(500);
55+
4156
const SHUTDOWN_REASON: &str = "pipeline data-path drained";
4257

4358
/// Holds the spawned extension tasks, control senders, and passive
@@ -57,6 +72,11 @@ pub(crate) struct ExtensionLifecycle {
5772
/// One-shot latch: `true` after `Shutdown` has been broadcast.
5873
/// Prevents re-firing on subsequent loop iterations.
5974
shutdown_broadcast_fired: bool,
75+
/// Deadline established when [`Self::broadcast_shutdown`] fires.
76+
/// Used by [`Self::drain_until_deadline`] to bound how long the
77+
/// runtime will wait for extensions to honour `Shutdown` so a
78+
/// misbehaving extension can't hang the pipeline indefinitely.
79+
shutdown_deadline: Option<Instant>,
6080
}
6181

6282
impl ExtensionLifecycle {
@@ -105,6 +125,7 @@ impl ExtensionLifecycle {
105125
shutdown_senders,
106126
_passive: passive,
107127
shutdown_broadcast_fired: false,
128+
shutdown_deadline: None,
108129
}
109130
}
110131

@@ -135,6 +156,7 @@ impl ExtensionLifecycle {
135156
self.shutdown_broadcast_fired = true;
136157

137158
let deadline = Instant::now() + EXTENSION_SHUTDOWN_GRACE;
159+
self.shutdown_deadline = Some(deadline);
138160
for sender in &self.shutdown_senders {
139161
// `try_send` is intentional: the extension's control
140162
// channel is a small mpsc and we don't want shutdown
@@ -148,4 +170,63 @@ impl ExtensionLifecycle {
148170
});
149171
}
150172
}
173+
174+
/// Drain remaining active+background extension tasks, but never
175+
/// past the shutdown deadline.
176+
///
177+
/// `Shutdown` is cooperative — extensions may ignore it or take
178+
/// longer than the grace window to exit. Without this bound, an
179+
/// extension that never returns from `start()` would hang the
180+
/// pipeline forever. After the deadline elapses, any still-running
181+
/// futures are dropped with a warning; the runtime's natural drop
182+
/// semantics take over once the lifecycle holder itself is dropped.
183+
///
184+
/// No-op if there are no remaining futures or if shutdown has not
185+
/// been broadcast (in which case there is no deadline yet).
186+
pub async fn drain_until_deadline(&mut self) {
187+
if self.futures.is_empty() {
188+
return;
189+
}
190+
// If the caller invokes drain without a prior broadcast there
191+
// is no deadline yet — synthesize one from the same grace
192+
// window so we still bound the wait.
193+
let deadline = self
194+
.shutdown_deadline
195+
.get_or_insert_with(|| Instant::now() + EXTENSION_SHUTDOWN_GRACE);
196+
// Wait slightly past the cooperative deadline to absorb the
197+
// time it takes the ControlChannel adapter to deliver the
198+
// queued `Shutdown` to the extension and for the extension's
199+
// task to return.
200+
let drain_deadline = TokioInstant::from_std(*deadline + EXTENSION_SHUTDOWN_DRAIN_SLACK);
201+
202+
let drain = async {
203+
while let Some(result) = self.futures.next().await {
204+
match result {
205+
Ok(Ok(())) => {}
206+
Ok(Err(e)) => {
207+
otel_warn!("extension.shutdown.task.error", error = format!("{e}"));
208+
}
209+
Err(e) => {
210+
otel_warn!(
211+
"extension.shutdown.task.join_error",
212+
is_canceled = e.is_cancelled(),
213+
is_panic = e.is_panic(),
214+
error = e.to_string()
215+
);
216+
}
217+
}
218+
}
219+
};
220+
221+
if tokio::time::timeout_at(drain_deadline, drain)
222+
.await
223+
.is_err()
224+
{
225+
otel_warn!(
226+
"extension.shutdown.timeout",
227+
grace_secs = EXTENSION_SHUTDOWN_GRACE.as_secs(),
228+
remaining = self.futures.len()
229+
);
230+
}
231+
}
151232
}

rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -539,56 +539,90 @@ impl<PData: 'static + Debug + Clone + ReceivedAtNode + Unwindable + FlowMetricHo
539539
// `extension_lifecycle`) run concurrently. When the data-path
540540
// drains, broadcast `Shutdown` to extensions so they can
541541
// terminate gracefully, then continue draining extension
542-
// futures. Errors from either side short-circuit and abort.
542+
// futures.
543+
//
544+
// Errors from either side short-circuit out of the inner
545+
// async block. The outer code then unconditionally
546+
// broadcasts `Shutdown` and bounded-drains extensions before
547+
// propagating the error. This guarantees that extensions
548+
// owning sockets, files, or background work get the same
549+
// cleanup signal on the error path that they would on the
550+
// normal path, and that a misbehaving extension that ignores
551+
// `Shutdown` cannot hang the pipeline beyond
552+
// `EXTENSION_SHUTDOWN_GRACE`.
543553
rt.block_on(async {
544554
local_tasks
545555
.run_until(async {
546-
let mut task_results = Vec::new();
547-
548-
loop {
549-
// `biased;`: prefer data-path completions when both
550-
// arms are simultaneously ready. Functionally
551-
// optional — kept to make intent explicit.
552-
tokio::select! {
553-
biased;
554-
Some(result) = futures.next(), if !futures.is_empty() => {
555-
match result {
556-
Ok(Ok(res)) => task_results.push(res),
557-
Ok(Err(e)) => return Err(e),
558-
Err(e) => return Err(Error::JoinTaskError {
559-
is_canceled: e.is_cancelled(),
560-
is_panic: e.is_panic(),
561-
error: e.to_string(),
562-
}),
556+
// Inner async block isolates the loop's
557+
// `return Err(...)` short-circuits from the
558+
// outer cleanup, giving us an "async finally"
559+
// shape: cleanup always runs, then the loop's
560+
// result is propagated.
561+
let loop_result: Result<Vec<_>, Error> = async {
562+
let mut task_results = Vec::new();
563+
loop {
564+
// `biased;`: prefer data-path completions when both
565+
// arms are simultaneously ready. Functionally
566+
// optional — kept to make intent explicit.
567+
tokio::select! {
568+
biased;
569+
Some(result) = futures.next(), if !futures.is_empty() => {
570+
match result {
571+
Ok(Ok(res)) => task_results.push(res),
572+
Ok(Err(e)) => return Err(e),
573+
Err(e) => return Err(Error::JoinTaskError {
574+
is_canceled: e.is_cancelled(),
575+
is_panic: e.is_panic(),
576+
error: e.to_string(),
577+
}),
578+
}
563579
}
564-
}
565-
Some(result) = extension_lifecycle.next_completion(), if !extension_lifecycle.is_empty() => {
566-
match result {
567-
Ok(Ok(())) => {}
568-
Ok(Err(e)) => return Err(e),
569-
Err(e) => return Err(Error::JoinTaskError {
570-
is_canceled: e.is_cancelled(),
571-
is_panic: e.is_panic(),
572-
error: e.to_string(),
573-
}),
580+
Some(result) = extension_lifecycle.next_completion(), if !extension_lifecycle.is_empty() => {
581+
match result {
582+
Ok(Ok(())) => {}
583+
Ok(Err(e)) => return Err(e),
584+
Err(e) => return Err(Error::JoinTaskError {
585+
is_canceled: e.is_cancelled(),
586+
is_panic: e.is_panic(),
587+
error: e.to_string(),
588+
}),
589+
}
574590
}
591+
else => break,
575592
}
576-
else => break,
577-
}
578593

579-
// Once data-path is drained, fire the shutdown
580-
// broadcast exactly once. The lifecycle holder
581-
// gates on its own one-shot latch and uses a
582-
// local `now() + EXTENSION_SHUTDOWN_GRACE`
583-
// deadline — extensions shut down after every
584-
// data-path task has terminated, so this is
585-
// the start of a fresh extension cleanup
586-
// window, not a continuation of the
587-
// pipeline-wide deadline.
588-
if futures.is_empty() {
589-
extension_lifecycle.broadcast_shutdown();
594+
// Once data-path is drained, fire the shutdown
595+
// broadcast exactly once. The lifecycle holder
596+
// gates on its own one-shot latch and uses a
597+
// local `now() + EXTENSION_SHUTDOWN_GRACE`
598+
// deadline — extensions shut down after every
599+
// data-path task has terminated, so this is
600+
// the start of a fresh extension cleanup
601+
// window, not a continuation of the
602+
// pipeline-wide deadline.
603+
if futures.is_empty() {
604+
extension_lifecycle.broadcast_shutdown();
605+
}
590606
}
607+
Ok(task_results)
591608
}
609+
.await;
610+
611+
// "Async finally": unconditional cleanup that
612+
// runs on both the normal and error paths.
613+
// `broadcast_shutdown` is idempotent (latched by
614+
// `shutdown_broadcast_fired`), so a no-op on the
615+
// normal path; on the error path it ensures
616+
// already-started extensions still receive
617+
// `Shutdown` before the pipeline exits, so they
618+
// can release sockets, files, or background work
619+
// cleanly. `drain_until_deadline` then bounds the
620+
// wait so a misbehaving extension can't hang the
621+
// runtime.
622+
extension_lifecycle.broadcast_shutdown();
623+
extension_lifecycle.drain_until_deadline().await;
624+
625+
let task_results = loop_result?;
592626
let mut final_metrics_reporter = final_metrics_reporter.clone();
593627
if let Err(err) = report_node_metrics_with_handles(
594628
&final_node_metric_handles,

rust/otap-dataflow/crates/engine/tests/extension_e2e.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2239,6 +2239,101 @@ connections:
22392239
);
22402240
}
22412241

2242+
// ─────────────────────────────────────────────────────────────────────
2243+
// Error-path shutdown hygiene — when one active extension errors out,
2244+
// any *other* already-started extensions must still receive `Shutdown`
2245+
// before the pipeline returns, so they can release sockets, files, or
2246+
// background work cleanly. Regression test for review feedback on
2247+
// PR #2860 (discussion_r3228775534).
2248+
// ─────────────────────────────────────────────────────────────────────
2249+
2250+
#[test]
2251+
fn test_other_extensions_receive_shutdown_when_pipeline_errors() {
2252+
let receiver_key = "err-shutdown-recv";
2253+
let ext_probe_key = "err-shutdown-ext-probe";
2254+
let _probe = make_probe(receiver_key, CallSequence::Local);
2255+
2256+
let ext_shutdown_at: Arc<parking_lot::Mutex<Option<Instant>>> =
2257+
Arc::new(parking_lot::Mutex::new(None));
2258+
register_shutdown_recording_probe(
2259+
ext_probe_key,
2260+
ShutdownRecordingProbe {
2261+
shutdown_at: Arc::clone(&ext_shutdown_at),
2262+
},
2263+
);
2264+
2265+
// Both extensions must be bound by some node so neither gets
2266+
// pruned as "defined-but-unbound" before the run starts. We use
2267+
// two probe receivers, each binding a distinct extension, fanning
2268+
// into the same exporter. The failing extension errors at start
2269+
// → pipeline aborts → the recording extension must still receive
2270+
// `Shutdown` on the way out.
2271+
let receiver_b_key = "err-shutdown-recv-b";
2272+
let _probe_b = make_probe(receiver_b_key, CallSequence::Local);
2273+
let yaml = format!(
2274+
r#"
2275+
nodes:
2276+
receiver-a:
2277+
type: "{PROBE_RECEIVER_URN}"
2278+
config:
2279+
probe_key: "{receiver_key}"
2280+
capabilities:
2281+
no_op_stateless: "shutdown-rec"
2282+
receiver-b:
2283+
type: "{PROBE_RECEIVER_URN}"
2284+
config:
2285+
probe_key: "{receiver_b_key}"
2286+
capabilities:
2287+
no_op_stateless: "failing"
2288+
exporter:
2289+
type: "{NOOP_EXPORTER_URN}"
2290+
2291+
extensions:
2292+
shutdown-rec:
2293+
type: "{SHUTDOWN_RECORDING_EXTENSION_URN}"
2294+
config:
2295+
probe_key: "{ext_probe_key}"
2296+
failing:
2297+
type: "{FAILING_EXTENSION_URN}"
2298+
2299+
connections:
2300+
- from: receiver-a
2301+
to: exporter
2302+
- from: receiver-b
2303+
to: exporter
2304+
"#
2305+
);
2306+
let (runtime_pipeline, ctx, entity_key, ts) = build_test_runtime_pipeline(&yaml);
2307+
2308+
// Generous outer shutdown grace — the test must return well before
2309+
// it because the pipeline aborts on the failing extension's error
2310+
// and then bounded-drains the recording extension within
2311+
// `EXTENSION_SHUTDOWN_GRACE` (5s).
2312+
let started_at = Instant::now();
2313+
let result = run_pipeline_with_shutdown_after(
2314+
runtime_pipeline,
2315+
ctx,
2316+
entity_key,
2317+
ts,
2318+
Duration::from_secs(60),
2319+
);
2320+
let elapsed = started_at.elapsed();
2321+
2322+
assert!(
2323+
result.is_err(),
2324+
"pipeline should propagate the failing extension's error: {result:?}"
2325+
);
2326+
assert!(
2327+
elapsed < Duration::from_secs(10),
2328+
"pipeline should not hang after the error path; took {elapsed:?}"
2329+
);
2330+
let shutdown = ext_shutdown_at.lock();
2331+
assert!(
2332+
shutdown.is_some(),
2333+
"recording extension must receive Shutdown on the error path"
2334+
);
2335+
}
2336+
22422337
// ─────────────────────────────────────────────────────────────────────
22432338
// Test 5 — shutdown ordering: extension records Shutdown timestamp
22442339
// inside the pipeline run window

0 commit comments

Comments
 (0)