Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 116 additions & 110 deletions src/main/services/TaskLifecycleService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EventEmitter } from 'node:events';
import { spawn, type ChildProcess } from 'node:child_process';
import path from 'node:path';
import { promisify } from 'node:util';
import { lifecycleScriptsService } from './LifecycleScriptsService';
Expand All @@ -15,6 +14,7 @@ import {
import { getTaskEnvVars } from '@shared/task/envVars';
import { log } from '../lib/logger';
import { execFile } from 'node:child_process';
import { startLifecyclePty, type LifecyclePtyHandle } from './ptyManager';

const execFileAsync = promisify(execFile);

Expand All @@ -27,8 +27,8 @@ type LifecycleResult = {
class TaskLifecycleService extends EventEmitter {
private states = new Map<string, TaskLifecycleState>();
private logBuffers = new Map<string, LifecycleLogs>();
private runProcesses = new Map<string, ChildProcess>();
private finiteProcesses = new Map<string, Set<ChildProcess>>();
private runPtys = new Map<string, LifecyclePtyHandle>();
private finitePtys = new Map<string, Set<LifecyclePtyHandle>>();
private runStartInflight = new Map<string, Promise<LifecycleResult>>();
private setupInflight = new Map<string, Promise<LifecycleResult>>();
private teardownInflight = new Map<string, Promise<LifecycleResult>>();
Expand All @@ -42,42 +42,63 @@ class TaskLifecycleService extends EventEmitter {
return `${taskId}::${taskPath}`;
}

private killProcessTree(proc: ChildProcess, signal: NodeJS.Signals): void {
const pid = proc.pid;
if (!pid) return;

if (process.platform === 'win32') {
const args = ['/PID', String(pid), '/T'];
if (signal === 'SIGKILL') {
args.push('/F');
}
const killer = spawn('taskkill', args, { stdio: 'ignore' });
killer.unref();
return;
}

try {
// Detached shell commands run as their own process group.
process.kill(-pid, signal);
} catch {
proc.kill(signal);
}
}

private trackFiniteProcess(taskId: string, proc: ChildProcess): () => void {
const set = this.finiteProcesses.get(taskId) ?? new Set<ChildProcess>();
set.add(proc);
this.finiteProcesses.set(taskId, set);
private trackFinitePty(taskId: string, pty: LifecyclePtyHandle): () => void {
const set = this.finitePtys.get(taskId) ?? new Set<LifecyclePtyHandle>();
set.add(pty);
this.finitePtys.set(taskId, set);
return () => {
const current = this.finiteProcesses.get(taskId);
const current = this.finitePtys.get(taskId);
if (!current) return;
current.delete(proc);
current.delete(pty);
if (current.size === 0) {
this.finiteProcesses.delete(taskId);
this.finitePtys.delete(taskId);
}
};
}

private createLifecyclePty(
id: string,
script: string,
cwd: string,
env: NodeJS.ProcessEnv
): LifecyclePtyHandle {
return startLifecyclePty({
id,
command: script,
cwd,
env,
});
}

private waitForPtyExit(
handle: LifecyclePtyHandle,
isTracked: () => boolean,
timeoutMs: number,
timeoutMessage: string
): Promise<void> {
if (!isTracked()) {
return Promise.resolve();
}

return new Promise<void>((resolve) => {
let done = false;
const finish = () => {
if (done) return;
done = true;
clearTimeout(timer);
resolve();
};
const timer = setTimeout(() => {
log.warn(timeoutMessage);
finish();
}, timeoutMs);
handle.onExit(() => finish());
if (!isTracked()) {
finish();
}
});
}

private async resolveDefaultBranch(projectPath: string): Promise<string> {
try {
const { stdout } = await execFileAsync(
Expand Down Expand Up @@ -227,20 +248,19 @@ class TaskLifecycleService extends EventEmitter {
};
try {
const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName);
const child = spawn(script, {
cwd: taskPath,
shell: true,
env,
detached: true,
});
const untrackFinite = this.trackFiniteProcess(taskId, child);
const onData = (buf: Buffer) => {
const line = buf.toString();
const pty = this.createLifecyclePty(
`lifecycle-${phase}-${taskId}`,
script,
taskPath,
env
);
const untrackFinite = this.trackFinitePty(taskId, pty);
pty.onData((line) => {
if (!this.finitePtys.get(taskId)?.has(pty)) return;
this.emitLifecycleEvent(taskId, phase, 'line', { line });
};
child.stdout?.on('data', onData);
child.stderr?.on('data', onData);
child.on('error', (error) => {
});
pty.onError((error) => {
if (!this.finitePtys.get(taskId)?.has(pty)) return;
untrackFinite();
const message = error?.message || String(error);
this.emitLifecycleEvent(taskId, phase, 'error', { error: message });
Expand All @@ -255,7 +275,8 @@ class TaskLifecycleService extends EventEmitter {
}
);
});
child.on('exit', (code) => {
pty.onExit((code) => {
if (!this.finitePtys.get(taskId)?.has(pty)) return;
untrackFinite();
const ok = code === 0;
this.emitLifecycleEvent(taskId, phase, ok ? 'done' : 'error', {
Expand Down Expand Up @@ -347,13 +368,8 @@ class TaskLifecycleService extends EventEmitter {
const script = lifecycleScriptsService.getScript(projectPath, 'run');
if (!script) return { ok: true, skipped: true };

const existing = this.runProcesses.get(taskId);
if (
existing &&
existing.exitCode === null &&
!existing.killed &&
!this.stopIntents.has(taskId)
) {
const existing = this.runPtys.get(taskId);
if (existing && !this.stopIntents.has(taskId)) {
return { ok: true, skipped: true };
}

Expand All @@ -373,24 +389,17 @@ class TaskLifecycleService extends EventEmitter {

try {
const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName);
const child = spawn(script, {
cwd: taskPath,
shell: true,
env,
detached: true,
});
this.runProcesses.set(taskId, child);
state.run.pid = child.pid ?? null;
const pty = this.createLifecyclePty(`lifecycle-run-${taskId}`, script, taskPath, env);
this.runPtys.set(taskId, pty);
state.run.pid = pty.pid;

const onData = (buf: Buffer) => {
const line = buf.toString();
pty.onData((line) => {
if (this.runPtys.get(taskId) !== pty) return;
this.emitLifecycleEvent(taskId, 'run', 'line', { line });
};
child.stdout?.on('data', onData);
child.stderr?.on('data', onData);
child.on('error', (error) => {
if (this.runProcesses.get(taskId) !== child) return;
this.runProcesses.delete(taskId);
});
pty.onError((error) => {
if (this.runPtys.get(taskId) !== pty) return;
this.runPtys.delete(taskId);
this.stopIntents.delete(taskId);
const message = error?.message || String(error);
const cur = this.ensureState(taskId);
Expand All @@ -402,9 +411,9 @@ class TaskLifecycleService extends EventEmitter {
};
this.emitLifecycleEvent(taskId, 'run', 'error', { error: message });
});
child.on('exit', (code) => {
if (this.runProcesses.get(taskId) !== child) return;
this.runProcesses.delete(taskId);
pty.onExit((code) => {
if (this.runPtys.get(taskId) !== pty) return;
this.runPtys.delete(taskId);
const wasStopped = this.stopIntents.has(taskId);
this.stopIntents.delete(taskId);
const cur = this.ensureState(taskId);
Expand Down Expand Up @@ -435,16 +444,18 @@ class TaskLifecycleService extends EventEmitter {
}

stopRun(taskId: string): LifecycleResult {
const proc = this.runProcesses.get(taskId);
if (!proc) return { ok: true, skipped: true };
const pty = this.runPtys.get(taskId);
if (!pty) return { ok: true, skipped: true };

this.stopIntents.add(taskId);
try {
this.killProcessTree(proc, 'SIGTERM');
pty.kill();
setTimeout(() => {
const current = this.runProcesses.get(taskId);
if (!current || current !== proc) return;
this.killProcessTree(proc, 'SIGKILL');
const current = this.runPtys.get(taskId);
if (!current || current !== pty) return;
try {
current.kill('SIGKILL');
} catch {}
}, 8_000);
return { ok: true };
} catch (error) {
Expand Down Expand Up @@ -480,25 +491,16 @@ class TaskLifecycleService extends EventEmitter {
}

// Ensure a managed run process is stopped before teardown starts.
const existingRun = this.runProcesses.get(taskId);
const existingRun = this.runPtys.get(taskId);
if (existingRun) {
const waitForExit = this.waitForPtyExit(
existingRun,
() => this.runPtys.get(taskId) === existingRun,
10_000,
'Timed out waiting for run process to exit before teardown'
);
this.stopRun(taskId);
await new Promise<void>((resolve) => {
let done = false;
const finish = () => {
if (done) return;
done = true;
resolve();
};
const timer = setTimeout(() => {
log.warn('Timed out waiting for run process to exit before teardown', { taskId });
finish();
}, 10_000);
existingRun.once('exit', () => {
clearTimeout(timer);
finish();
});
});
await waitForExit;
}
return this.runFinite(taskId, taskPath, projectPath, 'teardown', taskName);
})().finally(() => {
Expand Down Expand Up @@ -537,41 +539,45 @@ class TaskLifecycleService extends EventEmitter {
}
}

const proc = this.runProcesses.get(taskId);
if (proc) {
const pty = this.runPtys.get(taskId);
if (pty) {
this.runPtys.delete(taskId);
try {
this.killProcessTree(proc, 'SIGTERM');
pty.kill();
} catch {}
this.runProcesses.delete(taskId);
}

const finite = this.finiteProcesses.get(taskId);
const finite = this.finitePtys.get(taskId);
if (finite) {
for (const child of finite) {
this.finitePtys.delete(taskId);
for (const handle of finite) {
try {
this.killProcessTree(child, 'SIGTERM');
handle.kill();
} catch {}
}
this.finiteProcesses.delete(taskId);
}
}

shutdown(): void {
for (const [taskId, proc] of this.runProcesses.entries()) {
const runPtys = [...this.runPtys.entries()];
const finitePtys = [...this.finitePtys.values()];

this.runPtys.clear();
this.finitePtys.clear();

for (const [taskId, pty] of runPtys) {
try {
this.stopIntents.add(taskId);
this.killProcessTree(proc, 'SIGTERM');
pty.kill();
} catch {}
}
for (const procs of this.finiteProcesses.values()) {
for (const proc of procs) {
for (const handles of finitePtys) {
for (const handle of handles) {
try {
this.killProcessTree(proc, 'SIGTERM');
handle.kill();
} catch {}
}
}
this.runProcesses.clear();
this.finiteProcesses.clear();
this.runStartInflight.clear();
this.setupInflight.clear();
this.teardownInflight.clear();
Expand Down
Loading
Loading