-
Notifications
You must be signed in to change notification settings - Fork 222
feat(pty): implement lifecycle PTY management for task execution #1364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ import { | |
| import { getTaskEnvVars } from '@shared/task/envVars'; | ||
| import { log } from '../lib/logger'; | ||
| import { execFile } from 'node:child_process'; | ||
| import { startLifecyclePty, type LifecyclePtyHandle } from './ptyManager'; | ||
|
|
||
| const execFileAsync = promisify(execFile); | ||
|
|
||
|
|
@@ -28,6 +29,8 @@ class TaskLifecycleService extends EventEmitter { | |
| private states = new Map<string, TaskLifecycleState>(); | ||
| private logBuffers = new Map<string, LifecycleLogs>(); | ||
| private runProcesses = new Map<string, ChildProcess>(); | ||
| private runPtys = new Map<string, LifecyclePtyHandle>(); | ||
| private lifecyclePtys = new Map<string, LifecyclePtyHandle>(); | ||
| private finiteProcesses = new Map<string, Set<ChildProcess>>(); | ||
| private runStartInflight = new Map<string, Promise<LifecycleResult>>(); | ||
| private setupInflight = new Map<string, Promise<LifecycleResult>>(); | ||
|
|
@@ -227,48 +230,37 @@ class TaskLifecycleService extends EventEmitter { | |
| }; | ||
| try { | ||
| const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName); | ||
| const child = spawn(script, { | ||
| cwd: taskPath, | ||
| shell: true, | ||
| env, | ||
| detached: true, | ||
| }); | ||
| const untrackFinite = this.trackFiniteProcess(taskId, child); | ||
| const onData = (buf: Buffer) => { | ||
| const line = buf.toString(); | ||
| const ptyId = `lifecycle-${phase}-${taskId}`; | ||
| let ptyHandle: LifecyclePtyHandle; | ||
| try { | ||
| ptyHandle = startLifecyclePty({ | ||
| id: ptyId, | ||
| command: script, | ||
| cwd: taskPath, | ||
| env, | ||
| }); | ||
| } catch (ptyError) { | ||
| ptyHandle = this.spawnWithFallback(ptyId, script, taskPath, env); | ||
| } | ||
| this.lifecyclePtys.set(ptyId, ptyHandle); | ||
| ptyHandle.onData((data) => { | ||
| const line = data; | ||
| this.emitLifecycleEvent(taskId, phase, 'line', { line }); | ||
| }; | ||
| child.stdout?.on('data', onData); | ||
| child.stderr?.on('data', onData); | ||
| child.on('error', (error) => { | ||
| untrackFinite(); | ||
| const message = error?.message || String(error); | ||
| this.emitLifecycleEvent(taskId, phase, 'error', { error: message }); | ||
| const detail = this.buildErrorDetail(taskId, phase, message); | ||
| finish( | ||
| { ok: false, error: detail }, | ||
| { | ||
| ...state[phase], | ||
| status: 'failed', | ||
| finishedAt: this.nowIso(), | ||
| error: message, | ||
| } | ||
| ); | ||
| }); | ||
| child.on('exit', (code) => { | ||
| untrackFinite(); | ||
| const ok = code === 0; | ||
| ptyHandle.onExit((exitCode, _signal) => { | ||
| this.lifecyclePtys.delete(ptyId); | ||
| const ok = exitCode === 0; | ||
| this.emitLifecycleEvent(taskId, phase, ok ? 'done' : 'error', { | ||
| exitCode: code, | ||
| ...(ok ? {} : { error: `Exited with code ${String(code)}` }), | ||
| exitCode, | ||
| ...(ok ? {} : { error: `Exited with code ${String(exitCode)}` }), | ||
| }); | ||
| const errorMsg = `Exited with code ${String(code)}`; | ||
| const errorMsg = `Exited with code ${String(exitCode)}`; | ||
| const detail = ok ? undefined : this.buildErrorDetail(taskId, phase, errorMsg); | ||
| finish(ok ? { ok: true } : { ok: false, error: detail }, { | ||
| ...state[phase], | ||
| status: ok ? 'succeeded' : 'failed', | ||
| finishedAt: this.nowIso(), | ||
| exitCode: code, | ||
| exitCode, | ||
| error: ok ? null : errorMsg, | ||
| }); | ||
| }); | ||
|
|
@@ -289,6 +281,46 @@ class TaskLifecycleService extends EventEmitter { | |
| }); | ||
| } | ||
|
|
||
| private spawnWithFallback( | ||
| id: string, | ||
| script: string, | ||
| cwd: string, | ||
| env: NodeJS.ProcessEnv | ||
| ): LifecyclePtyHandle { | ||
| const child = spawn(script, { | ||
| cwd, | ||
| shell: true, | ||
| env, | ||
| detached: true, | ||
| }); | ||
| this.trackFiniteProcess(id, child); | ||
| const dataCallbacks: ((data: string) => void)[] = []; | ||
| const exitCallbacks: ((exitCode: number | null, signal: string | null) => void)[] = []; | ||
|
|
||
| const onData = (buf: Buffer) => { | ||
| const line = buf.toString(); | ||
| for (const cb of dataCallbacks) { | ||
| cb(line); | ||
| } | ||
| }; | ||
| child.stdout?.on('data', onData); | ||
| child.stderr?.on('data', onData); | ||
|
|
||
| child.on('exit', (code) => { | ||
| for (const cb of exitCallbacks) { | ||
| cb(code, null); | ||
| } | ||
| }); | ||
|
|
||
| return { | ||
| onData: (cb) => dataCallbacks.push(cb), | ||
| onExit: (cb) => exitCallbacks.push(cb), | ||
| kill: () => { | ||
| this.killProcessTree(child, 'SIGTERM'); | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| async runSetup( | ||
| taskId: string, | ||
| taskPath: string, | ||
|
|
@@ -347,17 +379,11 @@ class TaskLifecycleService extends EventEmitter { | |
| const script = lifecycleScriptsService.getScript(projectPath, 'run'); | ||
| if (!script) return { ok: true, skipped: true }; | ||
|
|
||
| const existing = this.runProcesses.get(taskId); | ||
| if ( | ||
| existing && | ||
| existing.exitCode === null && | ||
| !existing.killed && | ||
| !this.stopIntents.has(taskId) | ||
| ) { | ||
| const existingPty = this.runPtys.get(taskId); | ||
| if (existingPty && !this.stopIntents.has(taskId)) { | ||
| return { ok: true, skipped: true }; | ||
| } | ||
|
|
||
| // Clear any residual stop intent so the new process's exit is not misclassified. | ||
| this.stopIntents.delete(taskId); | ||
|
|
||
| const state = this.ensureState(taskId); | ||
|
|
@@ -373,50 +399,39 @@ class TaskLifecycleService extends EventEmitter { | |
|
|
||
| try { | ||
| const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName); | ||
| const child = spawn(script, { | ||
| cwd: taskPath, | ||
| shell: true, | ||
| env, | ||
| detached: true, | ||
| }); | ||
| this.runProcesses.set(taskId, child); | ||
| state.run.pid = child.pid ?? null; | ||
| const ptyId = `lifecycle-run-${taskId}`; | ||
| let ptyHandle: LifecyclePtyHandle; | ||
| try { | ||
| ptyHandle = startLifecyclePty({ | ||
| id: ptyId, | ||
| command: script, | ||
| cwd: taskPath, | ||
| env, | ||
| }); | ||
| } catch (ptyError) { | ||
| ptyHandle = this.spawnWithFallback(ptyId, script, taskPath, env); | ||
| } | ||
| this.runPtys.set(taskId, ptyHandle); | ||
|
|
||
| const onData = (buf: Buffer) => { | ||
| const line = buf.toString(); | ||
| ptyHandle.onData((data) => { | ||
| const line = data; | ||
| this.emitLifecycleEvent(taskId, 'run', 'line', { line }); | ||
| }; | ||
| child.stdout?.on('data', onData); | ||
| child.stderr?.on('data', onData); | ||
| child.on('error', (error) => { | ||
| if (this.runProcesses.get(taskId) !== child) return; | ||
| this.runProcesses.delete(taskId); | ||
| this.stopIntents.delete(taskId); | ||
| const message = error?.message || String(error); | ||
| const cur = this.ensureState(taskId); | ||
| cur.run = { | ||
| ...cur.run, | ||
| status: 'failed', | ||
| finishedAt: this.nowIso(), | ||
| error: message, | ||
| }; | ||
| this.emitLifecycleEvent(taskId, 'run', 'error', { error: message }); | ||
| }); | ||
| child.on('exit', (code) => { | ||
| if (this.runProcesses.get(taskId) !== child) return; | ||
| this.runProcesses.delete(taskId); | ||
| ptyHandle.onExit((exitCode, _signal) => { | ||
| if (this.runPtys.get(taskId) !== ptyHandle) return; | ||
| this.runPtys.delete(taskId); | ||
| const wasStopped = this.stopIntents.has(taskId); | ||
| this.stopIntents.delete(taskId); | ||
| const cur = this.ensureState(taskId); | ||
| cur.run = { | ||
| ...cur.run, | ||
| status: wasStopped ? 'idle' : code === 0 ? 'succeeded' : 'failed', | ||
| status: wasStopped ? 'idle' : exitCode === 0 ? 'succeeded' : 'failed', | ||
| finishedAt: this.nowIso(), | ||
| exitCode: code, | ||
| exitCode, | ||
| pid: null, | ||
| error: wasStopped || code === 0 ? null : `Exited with code ${String(code)}`, | ||
| error: wasStopped || exitCode === 0 ? null : `Exited with code ${String(exitCode)}`, | ||
| }; | ||
| this.emitLifecycleEvent(taskId, 'run', 'exit', { exitCode: code }); | ||
| this.emitLifecycleEvent(taskId, 'run', 'exit', { exitCode }); | ||
| }); | ||
|
|
||
| return { ok: true }; | ||
|
|
@@ -435,6 +450,28 @@ class TaskLifecycleService extends EventEmitter { | |
| } | ||
|
|
||
| stopRun(taskId: string): LifecycleResult { | ||
| const ptyHandle = this.runPtys.get(taskId); | ||
| if (ptyHandle) { | ||
| this.stopIntents.add(taskId); | ||
| try { | ||
| ptyHandle.kill(); | ||
| const cur = this.ensureState(taskId); | ||
| cur.run = { | ||
| ...cur.run, | ||
| status: 'idle', | ||
| finishedAt: this.nowIso(), | ||
| pid: null, | ||
| error: null, | ||
| }; | ||
| return { ok: true }; | ||
| } catch (error) { | ||
| this.stopIntents.delete(taskId); | ||
| const message = error instanceof Error ? error.message : String(error); | ||
| log.warn('Failed to stop run PTY', { taskId, error: message }); | ||
| return { ok: false, error: message }; | ||
| } | ||
|
Comment on lines
452
to
+472
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Premature state update in When Meanwhile, the Consider updating state in |
||
| } | ||
|
|
||
| const proc = this.runProcesses.get(taskId); | ||
| if (!proc) return { ok: true, skipped: true }; | ||
|
|
||
|
|
@@ -537,6 +574,14 @@ class TaskLifecycleService extends EventEmitter { | |
| } | ||
| } | ||
|
|
||
| const runPty = this.runPtys.get(taskId); | ||
| if (runPty) { | ||
| try { | ||
| runPty.kill(); | ||
| } catch {} | ||
| this.runPtys.delete(taskId); | ||
| } | ||
|
|
||
| const proc = this.runProcesses.get(taskId); | ||
| if (proc) { | ||
| try { | ||
|
|
@@ -557,6 +602,11 @@ class TaskLifecycleService extends EventEmitter { | |
| } | ||
|
|
||
| shutdown(): void { | ||
| for (const [, pty] of this.runPtys.entries()) { | ||
| try { | ||
| pty.kill(); | ||
| } catch {} | ||
| } | ||
| for (const [taskId, proc] of this.runProcesses.entries()) { | ||
| try { | ||
| this.stopIntents.add(taskId); | ||
|
|
@@ -570,6 +620,7 @@ class TaskLifecycleService extends EventEmitter { | |
| } catch {} | ||
| } | ||
| } | ||
| this.runPtys.clear(); | ||
| this.runProcesses.clear(); | ||
| this.finiteProcesses.clear(); | ||
| this.runStartInflight.clear(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1600,3 +1600,77 @@ export function getPtyKind(id: string): 'local' | 'ssh' | undefined { | |
| export function getPtyTmuxSessionName(id: string): string | undefined { | ||
| return ptys.get(id)?.tmuxSessionName; | ||
| } | ||
|
|
||
| export interface LifecyclePtyHandle { | ||
| onData: (callback: (data: string) => void) => void; | ||
| onExit: (callback: (exitCode: number | null, signal: string | null) => void) => void; | ||
| kill: () => void; | ||
| } | ||
|
|
||
| export function startLifecyclePty(options: { | ||
| id: string; | ||
| command: string; | ||
| cwd: string; | ||
| env?: NodeJS.ProcessEnv; | ||
| }): LifecyclePtyHandle { | ||
| if (process.env.EMDASH_DISABLE_PTY === '1') { | ||
| throw new Error('PTY disabled via EMDASH_DISABLE_PTY=1'); | ||
| } | ||
|
|
||
| const { id, command, cwd, env } = options; | ||
|
|
||
| const pty = require('node-pty'); | ||
|
|
||
| const defaultShell = getDefaultShell(); | ||
|
|
||
| const useEnv: Record<string, string> = { | ||
| TERM: 'xterm-256color', | ||
| COLORTERM: 'truecolor', | ||
| HOME: process.env.HOME || os.homedir(), | ||
| USER: process.env.USER || os.userInfo().username, | ||
| SHELL: defaultShell, | ||
| ...(process.env.LANG && { LANG: process.env.LANG }), | ||
| ...(process.env.TMPDIR && { TMPDIR: process.env.TMPDIR }), | ||
| ...(process.env.DISPLAY && { DISPLAY: process.env.DISPLAY }), | ||
| ...getDisplayEnv(), | ||
| ...(process.env.SSH_AUTH_SOCK && { SSH_AUTH_SOCK: process.env.SSH_AUTH_SOCK }), | ||
| ...(env || {}), | ||
| }; | ||
|
|
||
| const proc = pty.spawn(defaultShell, ['-ilc', command], { | ||
| name: 'xterm-256color', | ||
| cols: 120, | ||
| rows: 32, | ||
| cwd: cwd || os.homedir(), | ||
| env: useEnv, | ||
| }); | ||
|
Comment on lines
+1640
to
+1646
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interactive shell flags may interfere with script output The PTY is spawned as
For non-interactive lifecycle scripts, |
||
|
|
||
| const dataCallbacks: ((data: string) => void)[] = []; | ||
| const exitCallbacks: ((exitCode: number | null, signal: string | null) => void)[] = []; | ||
|
|
||
| proc.onData((data: string) => { | ||
| for (const cb of dataCallbacks) { | ||
| cb(data); | ||
| } | ||
| }); | ||
|
|
||
| proc.onExit(({ exitCode, signal }: { exitCode: number; signal: string }) => { | ||
| ptys.delete(id); | ||
| for (const cb of exitCallbacks) { | ||
| cb(exitCode, signal || null); | ||
| } | ||
| }); | ||
|
|
||
| ptys.set(id, { id, proc, cwd, kind: 'local', cols: 120, rows: 32 }); | ||
|
|
||
| return { | ||
| onData: (cb) => dataCallbacks.push(cb), | ||
| onExit: (cb) => exitCallbacks.push(cb), | ||
| kill: () => { | ||
| ptys.delete(id); | ||
| try { | ||
| proc.kill(); | ||
| } catch {} | ||
| }, | ||
| }; | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spawnWithFallbackdrops the'error'event handlerThe original
spawnpath attached achild.on('error', …)listener that calledfinish({ ok: false, … })when the process failed to start (e.g.,ENOENT, permission denied).spawnWithFallbackomits this listener entirely.If the shell process fails to spawn (or crashes at the OS level before writing to stdout/stderr), neither
exitnordatawill ever fire, and thefinishcallback insiderunFinitewill never be called. This leaves the task's phase status stuck at'running'indefinitely and causessetupInflight/teardownInflightpromises to hang forever.