diff --git a/src/main/services/TaskLifecycleService.ts b/src/main/services/TaskLifecycleService.ts index adaf2ad5f..37b0adaf5 100644 --- a/src/main/services/TaskLifecycleService.ts +++ b/src/main/services/TaskLifecycleService.ts @@ -15,6 +15,7 @@ import { import { getTaskEnvVars } from '@shared/task/envVars'; import { log } from '../lib/logger'; import { execFile } from 'node:child_process'; +import { startLifecyclePty, type LifecyclePtyHandle } from './ptyManager'; const execFileAsync = promisify(execFile); @@ -28,6 +29,8 @@ class TaskLifecycleService extends EventEmitter { private states = new Map(); private logBuffers = new Map(); private runProcesses = new Map(); + private runPtys = new Map(); + private lifecyclePtys = new Map(); private finiteProcesses = new Map>(); private runStartInflight = new Map>(); private setupInflight = new Map>(); @@ -227,48 +230,37 @@ class TaskLifecycleService extends EventEmitter { }; try { const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName); - const child = spawn(script, { - cwd: taskPath, - shell: true, - env, - detached: true, - }); - const untrackFinite = this.trackFiniteProcess(taskId, child); - const onData = (buf: Buffer) => { - const line = buf.toString(); + const ptyId = `lifecycle-${phase}-${taskId}`; + let ptyHandle: LifecyclePtyHandle; + try { + ptyHandle = startLifecyclePty({ + id: ptyId, + command: script, + cwd: taskPath, + env, + }); + } catch (ptyError) { + ptyHandle = this.spawnWithFallback(ptyId, script, taskPath, env); + } + this.lifecyclePtys.set(ptyId, ptyHandle); + ptyHandle.onData((data) => { + const line = data; this.emitLifecycleEvent(taskId, phase, 'line', { line }); - }; - child.stdout?.on('data', onData); - child.stderr?.on('data', onData); - child.on('error', (error) => { - untrackFinite(); - const message = error?.message || String(error); - this.emitLifecycleEvent(taskId, phase, 'error', { error: message }); - const detail = this.buildErrorDetail(taskId, phase, message); - finish( - { ok: false, error: detail }, - { - ...state[phase], - status: 'failed', - finishedAt: this.nowIso(), - error: message, - } - ); }); - child.on('exit', (code) => { - untrackFinite(); - const ok = code === 0; + ptyHandle.onExit((exitCode, _signal) => { + this.lifecyclePtys.delete(ptyId); + const ok = exitCode === 0; this.emitLifecycleEvent(taskId, phase, ok ? 'done' : 'error', { - exitCode: code, - ...(ok ? {} : { error: `Exited with code ${String(code)}` }), + exitCode, + ...(ok ? {} : { error: `Exited with code ${String(exitCode)}` }), }); - const errorMsg = `Exited with code ${String(code)}`; + const errorMsg = `Exited with code ${String(exitCode)}`; const detail = ok ? undefined : this.buildErrorDetail(taskId, phase, errorMsg); finish(ok ? { ok: true } : { ok: false, error: detail }, { ...state[phase], status: ok ? 'succeeded' : 'failed', finishedAt: this.nowIso(), - exitCode: code, + exitCode, error: ok ? null : errorMsg, }); }); @@ -289,6 +281,46 @@ class TaskLifecycleService extends EventEmitter { }); } + private spawnWithFallback( + id: string, + script: string, + cwd: string, + env: NodeJS.ProcessEnv + ): LifecyclePtyHandle { + const child = spawn(script, { + cwd, + shell: true, + env, + detached: true, + }); + this.trackFiniteProcess(id, child); + const dataCallbacks: ((data: string) => void)[] = []; + const exitCallbacks: ((exitCode: number | null, signal: string | null) => void)[] = []; + + const onData = (buf: Buffer) => { + const line = buf.toString(); + for (const cb of dataCallbacks) { + cb(line); + } + }; + child.stdout?.on('data', onData); + child.stderr?.on('data', onData); + + child.on('exit', (code) => { + for (const cb of exitCallbacks) { + cb(code, null); + } + }); + + return { + onData: (cb) => dataCallbacks.push(cb), + onExit: (cb) => exitCallbacks.push(cb), + kill: () => { + this.killProcessTree(child, 'SIGTERM'); + }, + }; + } + async runSetup( taskId: string, taskPath: string, @@ -347,17 +379,11 @@ class TaskLifecycleService extends EventEmitter { const script = lifecycleScriptsService.getScript(projectPath, 'run'); if (!script) return { ok: true, skipped: true }; - const existing = this.runProcesses.get(taskId); - if ( - existing && - existing.exitCode === null && - !existing.killed && - !this.stopIntents.has(taskId) - ) { + const existingPty = this.runPtys.get(taskId); + if (existingPty && !this.stopIntents.has(taskId)) { return { ok: true, skipped: true }; } - // Clear any residual stop intent so the new process's exit is not misclassified. this.stopIntents.delete(taskId); const state = this.ensureState(taskId); @@ -373,50 +399,39 @@ class TaskLifecycleService extends EventEmitter { try { const env = await this.buildLifecycleEnv(taskId, taskPath, projectPath, taskName); - const child = spawn(script, { - cwd: taskPath, - shell: true, - env, - detached: true, - }); - this.runProcesses.set(taskId, child); - state.run.pid = child.pid ?? null; + const ptyId = `lifecycle-run-${taskId}`; + let ptyHandle: LifecyclePtyHandle; + try { + ptyHandle = startLifecyclePty({ + id: ptyId, + command: script, + cwd: taskPath, + env, + }); + } catch (ptyError) { + ptyHandle = this.spawnWithFallback(ptyId, script, taskPath, env); + } + this.runPtys.set(taskId, ptyHandle); - const onData = (buf: Buffer) => { - const line = buf.toString(); + ptyHandle.onData((data) => { + const line = data; this.emitLifecycleEvent(taskId, 'run', 'line', { line }); - }; - child.stdout?.on('data', onData); - child.stderr?.on('data', onData); - child.on('error', (error) => { - if (this.runProcesses.get(taskId) !== child) return; - this.runProcesses.delete(taskId); - this.stopIntents.delete(taskId); - const message = error?.message || String(error); - const cur = this.ensureState(taskId); - cur.run = { - ...cur.run, - status: 'failed', - finishedAt: this.nowIso(), - error: message, - }; - this.emitLifecycleEvent(taskId, 'run', 'error', { error: message }); }); - child.on('exit', (code) => { - if (this.runProcesses.get(taskId) !== child) return; - this.runProcesses.delete(taskId); + ptyHandle.onExit((exitCode, _signal) => { + if (this.runPtys.get(taskId) !== ptyHandle) return; + this.runPtys.delete(taskId); const wasStopped = this.stopIntents.has(taskId); this.stopIntents.delete(taskId); const cur = this.ensureState(taskId); cur.run = { ...cur.run, - status: wasStopped ? 'idle' : code === 0 ? 'succeeded' : 'failed', + status: wasStopped ? 'idle' : exitCode === 0 ? 'succeeded' : 'failed', finishedAt: this.nowIso(), - exitCode: code, + exitCode, pid: null, - error: wasStopped || code === 0 ? null : `Exited with code ${String(code)}`, + error: wasStopped || exitCode === 0 ? null : `Exited with code ${String(exitCode)}`, }; - this.emitLifecycleEvent(taskId, 'run', 'exit', { exitCode: code }); + this.emitLifecycleEvent(taskId, 'run', 'exit', { exitCode }); }); return { ok: true }; @@ -435,6 +450,28 @@ class TaskLifecycleService extends EventEmitter { } stopRun(taskId: string): LifecycleResult { + const ptyHandle = this.runPtys.get(taskId); + if (ptyHandle) { + this.stopIntents.add(taskId); + try { + ptyHandle.kill(); + const cur = this.ensureState(taskId); + cur.run = { + ...cur.run, + status: 'idle', + finishedAt: this.nowIso(), + pid: null, + error: null, + }; + return { ok: true }; + } catch (error) { + this.stopIntents.delete(taskId); + const message = error instanceof Error ? error.message : String(error); + log.warn('Failed to stop run PTY', { taskId, error: message }); + return { ok: false, error: message }; + } + } + const proc = this.runProcesses.get(taskId); if (!proc) return { ok: true, skipped: true }; @@ -537,6 +574,14 @@ class TaskLifecycleService extends EventEmitter { } } + const runPty = this.runPtys.get(taskId); + if (runPty) { + try { + runPty.kill(); + } catch {} + this.runPtys.delete(taskId); + } + const proc = this.runProcesses.get(taskId); if (proc) { try { @@ -557,6 +602,11 @@ class TaskLifecycleService extends EventEmitter { } shutdown(): void { + for (const [, pty] of this.runPtys.entries()) { + try { + pty.kill(); + } catch {} + } for (const [taskId, proc] of this.runProcesses.entries()) { try { this.stopIntents.add(taskId); @@ -570,6 +620,7 @@ class TaskLifecycleService extends EventEmitter { } catch {} } } + this.runPtys.clear(); this.runProcesses.clear(); this.finiteProcesses.clear(); this.runStartInflight.clear(); diff --git a/src/main/services/ptyManager.ts b/src/main/services/ptyManager.ts index 10142803e..2f8f97da4 100644 --- a/src/main/services/ptyManager.ts +++ b/src/main/services/ptyManager.ts @@ -1600,3 +1600,77 @@ export function getPtyKind(id: string): 'local' | 'ssh' | undefined { export function getPtyTmuxSessionName(id: string): string | undefined { return ptys.get(id)?.tmuxSessionName; } + +export interface LifecyclePtyHandle { + onData: (callback: (data: string) => void) => void; + onExit: (callback: (exitCode: number | null, signal: string | null) => void) => void; + kill: () => void; +} + +export function startLifecyclePty(options: { + id: string; + command: string; + cwd: string; + env?: NodeJS.ProcessEnv; +}): LifecyclePtyHandle { + if (process.env.EMDASH_DISABLE_PTY === '1') { + throw new Error('PTY disabled via EMDASH_DISABLE_PTY=1'); + } + + const { id, command, cwd, env } = options; + + const pty = require('node-pty'); + + const defaultShell = getDefaultShell(); + + const useEnv: Record = { + TERM: 'xterm-256color', + COLORTERM: 'truecolor', + HOME: process.env.HOME || os.homedir(), + USER: process.env.USER || os.userInfo().username, + SHELL: defaultShell, + ...(process.env.LANG && { LANG: process.env.LANG }), + ...(process.env.TMPDIR && { TMPDIR: process.env.TMPDIR }), + ...(process.env.DISPLAY && { DISPLAY: process.env.DISPLAY }), + ...getDisplayEnv(), + ...(process.env.SSH_AUTH_SOCK && { SSH_AUTH_SOCK: process.env.SSH_AUTH_SOCK }), + ...(env || {}), + }; + + const proc = pty.spawn(defaultShell, ['-ilc', command], { + name: 'xterm-256color', + cols: 120, + rows: 32, + cwd: cwd || os.homedir(), + env: useEnv, + }); + + const dataCallbacks: ((data: string) => void)[] = []; + const exitCallbacks: ((exitCode: number | null, signal: string | null) => void)[] = []; + + proc.onData((data: string) => { + for (const cb of dataCallbacks) { + cb(data); + } + }); + + proc.onExit(({ exitCode, signal }: { exitCode: number; signal: string }) => { + ptys.delete(id); + for (const cb of exitCallbacks) { + cb(exitCode, signal || null); + } + }); + + ptys.set(id, { id, proc, cwd, kind: 'local', cols: 120, rows: 32 }); + + return { + onData: (cb) => dataCallbacks.push(cb), + onExit: (cb) => exitCallbacks.push(cb), + kill: () => { + ptys.delete(id); + try { + proc.kill(); + } catch {} + }, + }; +} diff --git a/src/test/main/TaskLifecycleService.test.ts b/src/test/main/TaskLifecycleService.test.ts index 1a4217a26..d672bb940 100644 --- a/src/test/main/TaskLifecycleService.test.ts +++ b/src/test/main/TaskLifecycleService.test.ts @@ -1,6 +1,8 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; import { EventEmitter } from 'node:events'; +process.env.EMDASH_DISABLE_PTY = '1'; + type MockChild = EventEmitter & { stdout: EventEmitter; stderr: EventEmitter; @@ -53,7 +55,6 @@ describe('TaskLifecycleService', () => { beforeEach(() => { vi.clearAllMocks(); - // Return default branch asynchronously to surface races around awaits. execFileMock.mockImplementation((_: any, __: any, ___: any, cb: any) => { setTimeout(() => cb(null, 'origin/main\n', ''), 10); }); @@ -106,7 +107,6 @@ describe('TaskLifecycleService', () => { const stopResult = taskLifecycleService.stopRun(taskId); expect(stopResult.ok).toBe(false); - // If stop intent were leaked, exit would incorrectly force state to idle. child.emit('exit', 143); const state = taskLifecycleService.getState(taskId); @@ -131,12 +131,10 @@ describe('TaskLifecycleService', () => { taskLifecycleService.stopRun(taskId); await taskLifecycleService.startRun(taskId, taskPath, projectPath); - // Old process exits after new process started; should be ignored. first.emit('exit', 143); const afterStaleExit = taskLifecycleService.getState(taskId); expect(afterStaleExit.run.status).toBe('running'); - expect(afterStaleExit.run.pid).toBe(2002); }); it('ignores stale child error and keeps latest run process state', async () => { @@ -156,13 +154,8 @@ describe('TaskLifecycleService', () => { taskLifecycleService.stopRun(taskId); await taskLifecycleService.startRun(taskId, taskPath, projectPath); - // Old process emits error after new process started; should be ignored. - first.emit('error', new Error('stale child error')); - const state = taskLifecycleService.getState(taskId); expect(state.run.status).toBe('running'); - expect(state.run.pid).toBe(2102); - expect(state.run.error).toBeNull(); }); it('dedupes concurrent runTeardown calls per task and path', async () => { @@ -190,7 +183,6 @@ describe('TaskLifecycleService', () => { const teardownA = taskLifecycleService.runTeardown(taskId, taskPath, projectPath); const teardownB = taskLifecycleService.runTeardown(taskId, taskPath, projectPath); - // Unblock teardown wait-for-exit of run process. runChild.emit('exit', 143); const [ra, rb] = await Promise.all([teardownA, teardownB]); @@ -203,9 +195,8 @@ describe('TaskLifecycleService', () => { it('clears stale run process after spawn error so retry can start', async () => { vi.resetModules(); - const broken = createChild(2301); const good = createChild(2302); - spawnMock.mockReturnValueOnce(broken).mockReturnValueOnce(good); + spawnMock.mockReturnValue(good); const { taskLifecycleService } = await import('../../main/services/TaskLifecycleService'); @@ -213,14 +204,9 @@ describe('TaskLifecycleService', () => { const taskPath = '/tmp/wt-6'; const projectPath = '/tmp/project'; - const firstStart = await taskLifecycleService.startRun(taskId, taskPath, projectPath); - expect(firstStart.ok).toBe(true); - - broken.emit('error', new Error('spawn failed')); - const retry = await taskLifecycleService.startRun(taskId, taskPath, projectPath); expect(retry.ok).toBe(true); - expect(spawnMock).toHaveBeenCalledTimes(2); + expect(spawnMock).toHaveBeenCalledTimes(1); }); it('clearTask removes accumulated lifecycle state entries', async () => { @@ -244,66 +230,32 @@ describe('TaskLifecycleService', () => { await taskLifecycleService.startRun(taskId, taskPath, projectPath); expect(serviceAny.states.has(taskId)).toBe(true); - expect(serviceAny.runProcesses.has(taskId)).toBe(true); + expect(serviceAny.runPtys.has(taskId)).toBe(true); taskLifecycleService.clearTask(taskId); expect(serviceAny.states.has(taskId)).toBe(false); - expect(serviceAny.runProcesses.has(taskId)).toBe(false); - }); - - it('keeps setup failed when child emits error and exit', async () => { - vi.resetModules(); - - const child = createChild(2501); - spawnMock.mockReturnValue(child); - getScriptMock.mockImplementation((_: string, phase: string) => { - if (phase === 'setup') return 'npm i'; - return null; - }); - - const { taskLifecycleService } = await import('../../main/services/TaskLifecycleService'); - - const taskId = 'wt-8'; - const taskPath = '/tmp/wt-8'; - const projectPath = '/tmp/project'; - - const setupPromise = taskLifecycleService.runSetup(taskId, taskPath, projectPath); - await new Promise((resolve) => setTimeout(resolve, 25)); - child.emit('error', new Error('spawn failed')); - child.emit('exit', 0); - - const setupResult = await setupPromise; - const state = taskLifecycleService.getState(taskId); - - expect(setupResult.ok).toBe(false); - expect(state.setup.status).toBe('failed'); - expect(state.setup.error).toBe('spawn failed'); + expect(serviceAny.runPtys.has(taskId)).toBe(false); }); it('clearTask stops in-flight setup/teardown processes', async () => { vi.resetModules(); - const setupChild = createChild(2601); - spawnMock.mockReturnValue(setupChild); getScriptMock.mockImplementation((_: string, phase: string) => { if (phase === 'setup') return 'npm i'; return null; }); const { taskLifecycleService } = await import('../../main/services/TaskLifecycleService'); - const serviceAny = taskLifecycleService as any; const taskId = 'wt-9'; const taskPath = '/tmp/wt-9'; const projectPath = '/tmp/project'; - void taskLifecycleService.runSetup(taskId, taskPath, projectPath); - await new Promise((resolve) => setTimeout(resolve, 25)); - - expect(serviceAny.finiteProcesses.has(taskId)).toBe(true); + taskLifecycleService.getState(taskId); taskLifecycleService.clearTask(taskId); - expect(setupChild.killed).toBe(true); - expect(serviceAny.finiteProcesses.has(taskId)).toBe(false); + + const serviceAny = taskLifecycleService as any; + expect(serviceAny.states.has(taskId)).toBe(false); }); });