Skip to content

Commit 93c34c6

Browse files
feat: flow for loop squashing (#7107)
* feat: flow for loop optimization * fix: make dedicated flows work with flow nodes * fix: transform dedicated inputs + better error handling + differentiate squash from same worker * fix builds * fix build * fix build * make dedicated worker private * update ee ref * frontend nits * nit * add trace to dedicated worker * update ee ref * Update ee-repo-ref.txt --------- Co-authored-by: windmill-internal-app[bot] <windmill-internal-app[bot]@users.noreply.github.com>
1 parent 79a0d3e commit 93c34c6

File tree

23 files changed

+548
-886
lines changed

23 files changed

+548
-886
lines changed

backend/ee-repo-ref.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1de62d3f0901aac6ef9ddd7595e1032be56b1938
1+
f9434efa127dadb0a04c87d6fc368cc289d93501

backend/src/monitor.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2316,13 +2316,14 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, node_n
23162316
);
23172317

23182318
let error_message = format!(
2319-
"Job timed out after no ping from job since {} (ZOMBIE_JOB_TIMEOUT: {}, reason: {:?}).\nThis likely means that the job died on worker {}, OOM are a common reason for worker crashes.\nCheck the workers around the time of the last ping and the exit code if any.",
2320-
job.last_ping.unwrap_or_default(),
2321-
*ZOMBIE_JOB_TIMEOUT,
2322-
error_kind.to_string(),
2323-
job.worker.clone().unwrap_or_default(),
2324-
);
2319+
"Job timed out after no ping from job since {} (ZOMBIE_JOB_TIMEOUT: {}, reason: {:?}).\nThis likely means that the job died on worker {}, OOM are a common reason for worker crashes.\nCheck the workers around the time of the last ping and the exit code if any.",
2320+
job.last_ping.unwrap_or_default(),
2321+
*ZOMBIE_JOB_TIMEOUT,
2322+
error_kind.to_string(),
2323+
job.worker.clone().unwrap_or_default(),
2324+
);
23252325
let memory_peak = job.memory_peak.unwrap_or(0);
2326+
let (_, killpill_rx_never_used) = KillpillSender::new(1);
23262327
let _ = handle_job_error(
23272328
db,
23282329
&client,
@@ -2335,6 +2336,7 @@ async fn handle_zombie_jobs(db: &Pool<Postgres>, base_internal_url: &str, node_n
23352336
"",
23362337
node_name,
23372338
send_result_never_used,
2339+
&killpill_rx_never_used,
23382340
#[cfg(feature = "benchmark")]
23392341
&mut windmill_common::bench::BenchmarkIter::new(),
23402342
)

backend/tests/worker.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ async fn test_deno_flow(db: Pool<Postgres>) -> anyhow::Result<()> {
217217
iterator: InputTransform::Javascript { expr: "result".to_string() },
218218
skip_failures: false,
219219
parallel: false,
220+
squash: None,
220221
parallelism: None,
221222
modules: vec![FlowModule {
222223
id: "c".to_string(),
@@ -396,6 +397,7 @@ async fn test_deno_flow_same_worker(db: Pool<Postgres>) -> anyhow::Result<()> {
396397
iterator: InputTransform::Static { value: windmill_common::worker::to_raw_value(&[1, 2, 3]) },
397398
skip_failures: false,
398399
parallel: false,
400+
squash: None,
399401
parallelism: None,
400402
modules: vec![
401403
FlowModule {

backend/windmill-api/src/flows.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,7 @@ mod tests {
15831583
skip_failures: true,
15841584
parallel: false,
15851585
parallelism: None,
1586+
squash: None,
15861587
}),
15871588
stop_after_if: Some(StopAfterIf {
15881589
expr: "previous.isEmpty()".to_string(),

backend/windmill-common/src/flows.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,8 @@ pub enum FlowModuleValue {
869869
parallel: bool,
870870
#[serde(skip_serializing_if = "Option::is_none")]
871871
parallelism: Option<InputTransform>,
872+
#[serde(skip_serializing_if = "Option::is_none")]
873+
squash: Option<bool>,
872874
},
873875

874876
/// While loop node
@@ -878,6 +880,8 @@ pub enum FlowModuleValue {
878880
modules_node: Option<FlowNodeId>,
879881
#[serde(default = "default_false")]
880882
skip_failures: bool,
883+
#[serde(skip_serializing_if = "Option::is_none")]
884+
squash: Option<bool>,
881885
},
882886

883887
/// Branch-one node
@@ -989,6 +993,7 @@ struct UntaggedFlowModuleValue {
989993
assets: Option<Vec<AssetWithAltAccessType>>,
990994
tools: Option<Vec<AgentTool>>,
991995
pass_flow_input_directly: Option<bool>,
996+
squash: Option<bool>,
992997
}
993998

994999
impl<'de> Deserialize<'de> for FlowModuleValue {
@@ -1027,13 +1032,15 @@ impl<'de> Deserialize<'de> for FlowModuleValue {
10271032
skip_failures: untagged.skip_failures.unwrap_or(true),
10281033
parallel: untagged.parallel.unwrap_or(false),
10291034
parallelism: untagged.parallelism,
1035+
squash: untagged.squash,
10301036
}),
10311037
"whileloopflow" => Ok(FlowModuleValue::WhileloopFlow {
10321038
modules: untagged
10331039
.modules
10341040
.ok_or_else(|| serde::de::Error::missing_field("modules"))?,
10351041
modules_node: untagged.modules_node,
10361042
skip_failures: untagged.skip_failures.unwrap_or(false),
1043+
squash: untagged.squash,
10371044
}),
10381045
"branchone" => Ok(FlowModuleValue::BranchOne {
10391046
branches: untagged

backend/windmill-queue/src/jobs.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use serde::{ser::SerializeMap, Serialize};
2424
use serde_json::{json, value::RawValue};
2525
use sqlx::{types::Json, Pool, Postgres, Transaction};
2626
use sqlx::{Encode, PgExecutor};
27+
use tokio::sync::mpsc::Sender;
28+
use tokio::sync::oneshot;
29+
use tokio::task::JoinHandle;
2730
use tokio::{sync::RwLock, time::sleep};
2831
use ulid::Ulid;
2932
use uuid::Uuid;
@@ -137,7 +140,7 @@ pub struct CanceledBy {
137140
pub reason: Option<String>,
138141
}
139142

140-
#[derive(Debug, Clone, Serialize, Deserialize)]
143+
#[derive(Debug, Serialize, Deserialize)]
141144
pub struct JobCompleted {
142145
pub job: MiniCompletedJob,
143146
pub preprocessed_args: Option<HashMap<String, Box<RawValue>>>,
@@ -151,6 +154,10 @@ pub struct JobCompleted {
151154
pub duration: Option<i64>,
152155
pub has_stream: Option<bool>,
153156
pub from_cache: Option<bool>,
157+
#[serde(skip)]
158+
pub flow_runners: Option<Arc<FlowRunners>>,
159+
#[serde(skip)]
160+
pub done_tx: Option<oneshot::Sender<()>>,
154161
}
155162

156163
pub async fn cancel_single_job<'c>(
@@ -2271,6 +2278,8 @@ pub struct JobAndPerms {
22712278
pub parent_runnable_path: Option<String>,
22722279
pub token: String,
22732280
pub precomputed_agent_info: Option<PrecomputedAgentInfo>,
2281+
#[serde(skip)]
2282+
pub flow_runners: Option<Arc<FlowRunners>>,
22742283
}
22752284
impl PulledJob {
22762285
pub async fn get_job_and_perms(self, db: &DB) -> JobAndPerms {
@@ -2302,6 +2311,7 @@ impl PulledJob {
23022311
parent_runnable_path: self.parent_runnable_path,
23032312
token,
23042313
precomputed_agent_info: None,
2314+
flow_runners: None,
23052315
}
23062316
}
23072317
}
@@ -2492,6 +2502,8 @@ impl PulledJobResult {
24922502
duration: None,
24932503
has_stream: Some(false),
24942504
from_cache: None,
2505+
flow_runners: None,
2506+
done_tx: None,
24952507
}),
24962508
),
24972509
PulledJobResult { job: Some(job), error_while_preprocessing: Some(e), .. } => Err(
@@ -2511,6 +2523,8 @@ impl PulledJobResult {
25112523
duration: None,
25122524
has_stream: Some(false),
25132525
from_cache: None,
2526+
flow_runners: None,
2527+
done_tx: None,
25142528
}),
25152529
),
25162530
PulledJobResult { job, .. } => Ok(job),
@@ -5715,10 +5729,74 @@ async fn restarted_flows_resolution(
57155729
))
57165730
}
57175731

5732+
5733+
5734+
// Wrapper struct to send both job and optional flow_runners to dedicated workers
5735+
pub struct DedicatedWorkerJob {
5736+
pub job: Arc<MiniPulledJob>,
5737+
pub flow_runners: Option<Arc<FlowRunners>>,
5738+
pub done_tx: Option<oneshot::Sender<()>>,
5739+
}
5740+
5741+
#[derive(Debug)]
5742+
pub struct FlowRunners {
5743+
pub runners: HashMap<String, Sender<DedicatedWorkerJob>>,
5744+
pub handles: Vec<JoinHandle<()>>,
5745+
pub job_id: Uuid,
5746+
}
5747+
5748+
impl Drop for FlowRunners {
5749+
fn drop(&mut self) {
5750+
let total_runners = self.handles.len();
5751+
tracing::info!("dropping {} flow runners for job {}", total_runners, self.job_id);
5752+
5753+
// First, drop all senders to signal workers to stop gracefully
5754+
self.runners.clear();
5755+
5756+
// Spawn a background task to wait with timeout and abort if needed
5757+
let handles = std::mem::take(&mut self.handles);
5758+
let job_id = self.job_id;
5759+
5760+
tokio::spawn(async move {
5761+
// Extract abort handles before consuming the join handles
5762+
let abort_handles: Vec<_> = handles.iter().map(|h| h.abort_handle()).collect();
5763+
5764+
// Wait up to 5 seconds for natural termination
5765+
let timeout_result = tokio::time::timeout(
5766+
tokio::time::Duration::from_secs(5),
5767+
futures::future::join_all(handles)
5768+
).await;
5769+
5770+
match timeout_result {
5771+
Ok(_) => {
5772+
tracing::info!("all {} flow runners for job {} terminated gracefully", total_runners, job_id);
5773+
}
5774+
Err(_) => {
5775+
// Timeout reached, abort only the handles that haven't finished
5776+
let mut aborted = 0;
5777+
for abort_handle in abort_handles {
5778+
if !abort_handle.is_finished() {
5779+
abort_handle.abort();
5780+
aborted += 1;
5781+
}
5782+
}
5783+
let graceful = total_runners - aborted;
5784+
tracing::warn!(
5785+
"flow runners for job {}: {} terminated gracefully, {} aborted after 5s timeout",
5786+
job_id, graceful, aborted
5787+
);
5788+
}
5789+
}
5790+
});
5791+
}
5792+
}
5793+
57185794
#[derive(Debug, Serialize, Deserialize)]
57195795
pub struct SameWorkerPayload {
57205796
pub job_id: Uuid,
57215797
pub recoverable: bool,
5798+
#[serde(skip)]
5799+
pub flow_runners: Option<Arc<FlowRunners>>,
57225800
}
57235801

57245802
pub async fn get_same_worker_job(

backend/windmill-queue/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* LICENSE-AGPL for a copy of the license.
77
*/
88

9-
mod jobs;
9+
pub mod jobs;
1010
#[cfg(feature = "private")]
1111
pub mod jobs_ee;
1212
pub mod jobs_oss;

backend/windmill-worker/src/ai/tools.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ async fn execute_windmill_tool(
529529
&mut occupancy_metrics_spawn,
530530
&mut killpill_rx_spawn,
531531
None,
532+
None,
532533
#[cfg(feature = "benchmark")]
533534
&mut bench_spawn,
534535
)
@@ -656,8 +657,9 @@ async fn handle_tool_execution_success(
656657
..
657658
}) = send_result.as_ref()
658659
{
660+
let result = result.clone();
659661
ctx.job_completed_tx
660-
.send(send_result.as_ref().unwrap().result.clone(), true)
662+
.send(send_result.unwrap().result, true)
661663
.await
662664
.map_err(to_anyhow)?;
663665
result

backend/windmill-worker/src/bun_executor.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ use serde_json::value::RawValue;
99

1010
use uuid::Uuid;
1111
use windmill_parser_ts::remove_pinned_imports;
12-
use windmill_queue::{append_logs, CanceledBy, MiniPulledJob, PrecomputedAgentInfo};
1312

14-
#[cfg(feature = "enterprise")]
15-
use crate::common::build_envs_map;
13+
use windmill_queue::{append_logs, CanceledBy, MiniPulledJob, PrecomputedAgentInfo};
1614

1715
use crate::{
1816
common::{
@@ -38,12 +36,6 @@ use tokio::{fs::File, process::Command};
3836

3937
use tokio::io::AsyncReadExt;
4038

41-
#[cfg(feature = "enterprise")]
42-
use tokio::sync::mpsc::Receiver;
43-
44-
#[cfg(feature = "enterprise")]
45-
use windmill_common::variables;
46-
4739
use windmill_common::{
4840
error::{self, Result},
4941
get_latest_hash_for_path,
@@ -1567,10 +1559,18 @@ pub async fn get_common_bun_proc_envs(base_internal_url: Option<&str>) -> HashMa
15671559
return bun_envs;
15681560
}
15691561

1570-
#[cfg(feature = "enterprise")]
1571-
use crate::{dedicated_worker::handle_dedicated_process, JobCompletedSender};
1562+
#[cfg(feature = "private")]
1563+
use crate::{
1564+
common::build_envs_map, dedicated_worker_oss::handle_dedicated_process, JobCompletedSender,
1565+
};
1566+
#[cfg(feature = "private")]
1567+
use tokio::sync::mpsc::Receiver;
1568+
#[cfg(feature = "private")]
1569+
use windmill_common::variables;
1570+
#[cfg(feature = "private")]
1571+
use windmill_queue::DedicatedWorkerJob;
15721572

1573-
#[cfg(feature = "enterprise")]
1573+
#[cfg(feature = "private")]
15741574
pub async fn start_worker(
15751575
requirements_o: Option<String>,
15761576
codebase: Option<String>,
@@ -1584,8 +1584,9 @@ pub async fn start_worker(
15841584
script_path: &str,
15851585
token: &str,
15861586
job_completed_tx: JobCompletedSender,
1587-
jobs_rx: Receiver<std::sync::Arc<MiniPulledJob>>,
1587+
jobs_rx: Receiver<DedicatedWorkerJob>,
15881588
killpill_rx: tokio::sync::broadcast::Receiver<()>,
1589+
client: windmill_common::client::AuthedClient,
15891590
) -> Result<()> {
15901591
let mut logs = "".to_string();
15911592
let mut mem_peak: i32 = 0;
@@ -1737,16 +1738,21 @@ BigInt.prototype.toJSON = function () {{
17371738
17381739
console.log('start');
17391740
1741+
function getArgs(line) {{
1742+
let {{ {spread} }} = JSON.parse(line)
1743+
{dates}
1744+
return [ {spread} ];
1745+
}}
1746+
17401747
for await (const line of Readline.createInterface({{ input: process.stdin }})) {{
17411748
{print_lines}
17421749
17431750
if (line === "end") {{
17441751
process.exit(0);
17451752
}}
17461753
try {{
1747-
let {{ {spread} }} = JSON.parse(line)
1748-
{dates}
1749-
let res = await Main.main(...[ {spread} ]);
1754+
const args = getArgs(line);
1755+
const res = await Main.main(...args);
17501756
console.log("wm_res[success]:" + JSON.stringify(res ?? null, (key, value) => typeof value === 'undefined' ? null : value));
17511757
}} catch (e) {{
17521758
console.log("wm_res[error]:" + JSON.stringify({{ message: e.message, name: e.name, stack: e.stack, line: line }}));
@@ -1812,6 +1818,7 @@ for await (const line of Readline.createInterface({{ input: process.stdin }})) {
18121818
db,
18131819
&script_path,
18141820
"nodejs",
1821+
client,
18151822
)
18161823
.await
18171824
} else {
@@ -1838,6 +1845,7 @@ for await (const line of Readline.createInterface({{ input: process.stdin }})) {
18381845
db,
18391846
script_path,
18401847
"bun",
1848+
client,
18411849
)
18421850
.await
18431851
}

0 commit comments

Comments
 (0)