diff --git a/.gitignore b/.gitignore index 24eca450f..3cc15b99a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ # Maestro TEMP-REFACTORING-PLAN.md TEMP-REFACTORING-PLAN-2.md +CUE-NOTES.md +CUE-REFACTORING.md Auto\ Run\ Docs/ Work\ Trees/ community-data/ diff --git a/src/__tests__/main/cue/cue-concurrency.test.ts b/src/__tests__/main/cue/cue-concurrency.test.ts index 5a1b92d5b..758e1aee2 100644 --- a/src/__tests__/main/cue/cue-concurrency.test.ts +++ b/src/__tests__/main/cue/cue-concurrency.test.ts @@ -633,4 +633,203 @@ describe('CueEngine Concurrency Control', () => { engine.stop(); }); }); + + describe('stress scenarios', () => { + it('processes multiple queued events in FIFO order', async () => { + const resolvers: ((val: CueRunResult) => void)[] = []; + const callOrder: number[] = []; + let callCount = 0; + + const deps = createMockDeps({ + onCueRun: vi.fn(() => { + const idx = callCount++; + callOrder.push(idx); + return new Promise((resolve) => { + resolvers.push(resolve); + }); + }), + }); + + const config = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + // Initial heartbeat occupies the slot + await vi.advanceTimersByTimeAsync(10); + expect(deps.onCueRun).toHaveBeenCalledTimes(1); + + // Queue several events via timer advances + vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1 + vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2 + vi.advanceTimersByTime(1 * 60 * 1000); // queued: 3 + expect(engine.getQueueStatus().get('session-1')).toBe(3); + expect(deps.onCueRun).toHaveBeenCalledTimes(1); // still only 1 dispatched + + // Resolve the first run -> drain dispatches next queued event + const makeResult = (runId: string): CueRunResult => ({ + runId, + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'timer', + event: {} as CueEvent, + status: 'completed', + stdout: '', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }); + + resolvers[0](makeResult('r0')); + await vi.advanceTimersByTimeAsync(10); + expect(deps.onCueRun).toHaveBeenCalledTimes(2); + + // Resolve second -> third dispatched + resolvers[1](makeResult('r1')); + await vi.advanceTimersByTimeAsync(10); + expect(deps.onCueRun).toHaveBeenCalledTimes(3); + + // Resolve third -> fourth dispatched + resolvers[2](makeResult('r2')); + await vi.advanceTimersByTimeAsync(10); + expect(deps.onCueRun).toHaveBeenCalledTimes(4); + + // Verify events dispatched in sequential order + expect(callOrder).toEqual([0, 1, 2, 3]); + + engine.stopAll(); + engine.stop(); + }); + + it('all queued events become stale and are dropped', async () => { + let resolveRun: ((val: CueRunResult) => void) | undefined; + const deps = createMockDeps({ + onCueRun: vi.fn( + () => + new Promise((resolve) => { + resolveRun = resolve; + }) + ), + }); + + const config = createMockConfig({ + settings: { + timeout_minutes: 1, // 1 minute timeout + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + // First run occupies the slot + await vi.advanceTimersByTimeAsync(10); + expect(deps.onCueRun).toHaveBeenCalledTimes(1); + + // Queue several events + vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1 + vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2 + expect(engine.getQueueStatus().get('session-1')).toBe(2); + + // Advance time past timeout so all queued events become stale (>1 minute) + vi.advanceTimersByTime(3 * 60 * 1000); + + // Resolve the in-flight run -> drain should evict all queued events as stale + resolveRun!({ + runId: 'r1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'timer', + event: {} as CueEvent, + status: 'completed', + stdout: '', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }); + await vi.advanceTimersByTimeAsync(10); + + // All stale events should have been dropped + expect(deps.onLog).toHaveBeenCalledWith( + 'cue', + expect.stringContaining('Dropping stale queued event') + ); + + engine.stopAll(); + engine.stop(); + }); + + it('enable/disable toggle during queue drain does not crash', async () => { + const deps = createMockDeps({ + onCueRun: vi.fn(() => new Promise(() => {})), + }); + + const config = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(10); + + // Queue some events + vi.advanceTimersByTime(1 * 60 * 1000); + vi.advanceTimersByTime(1 * 60 * 1000); + expect(engine.getQueueStatus().get('session-1')).toBe(2); + + // Stop engine mid-drain — should not crash + expect(() => { + engine.stop(); + }).not.toThrow(); + + // Queue should be cleared + expect(engine.getQueueStatus().size).toBe(0); + }); + }); }); diff --git a/src/__tests__/main/cue/cue-engine.test.ts b/src/__tests__/main/cue/cue-engine.test.ts index 3bb6b7bc3..225399e4e 100644 --- a/src/__tests__/main/cue/cue-engine.test.ts +++ b/src/__tests__/main/cue/cue-engine.test.ts @@ -2142,4 +2142,603 @@ describe('CueEngine', () => { engine.stop(); }); }); + + describe('output prompt separate runId (Fix 5)', () => { + it('output prompt uses a different runId from main run', async () => { + const mainResult: CueRunResult = { + runId: 'run-main', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'timer', + event: {} as CueEvent, + status: 'completed', + stdout: 'main output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + const outputResult: CueRunResult = { + ...mainResult, + runId: 'run-output', + stdout: 'formatted output', + }; + const onCueRun = vi + .fn() + .mockResolvedValueOnce(mainResult) + .mockResolvedValueOnce(outputResult); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + output_prompt: 'format results', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps({ onCueRun }); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(100); + + expect(onCueRun).toHaveBeenCalledTimes(2); + const firstRunId = onCueRun.mock.calls[0][0].runId; + const secondRunId = onCueRun.mock.calls[1][0].runId; + expect(firstRunId).not.toBe(secondRunId); + + engine.stop(); + }); + + it('output prompt timeout falls back to main output', async () => { + const mainResult: CueRunResult = { + runId: 'run-main', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'timer', + event: {} as CueEvent, + status: 'completed', + stdout: 'main-output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + const timeoutResult: CueRunResult = { + ...mainResult, + runId: 'run-output', + status: 'timeout', + stdout: '', + }; + const onCueRun = vi + .fn() + .mockResolvedValueOnce(mainResult) + .mockResolvedValueOnce(timeoutResult); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + output_prompt: 'format results', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps({ onCueRun }); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(100); + + // Activity log entry should have the main output (fallback) + const log = engine.getActivityLog(); + expect(log[0].stdout).toBe('main-output'); + + engine.stop(); + }); + + it('output prompt event includes outputPromptPhase: true', async () => { + const mainResult: CueRunResult = { + runId: 'run-main', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'timer', + event: {} as CueEvent, + status: 'completed', + stdout: 'main output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + const outputResult: CueRunResult = { + ...mainResult, + runId: 'run-output', + stdout: 'formatted', + }; + const onCueRun = vi + .fn() + .mockResolvedValueOnce(mainResult) + .mockResolvedValueOnce(outputResult); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + output_prompt: 'format results', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps({ onCueRun }); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(100); + + expect(onCueRun).toHaveBeenCalledTimes(2); + const secondCallEvent = onCueRun.mock.calls[1][0].event; + expect(secondCallEvent.payload.outputPromptPhase).toBe(true); + + engine.stop(); + }); + + it('completion chain receives output prompt stdout when successful', async () => { + // Session A has heartbeat + output_prompt; Session B watches session A via agent.completed + const sessionA = createMockSession({ + id: 'session-a', + name: 'Agent A', + projectRoot: '/proj/a', + }); + const sessionB = createMockSession({ + id: 'session-b', + name: 'Agent B', + projectRoot: '/proj/b', + }); + + const configA = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-a', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + output_prompt: 'format nicely', + interval_minutes: 60, + }, + ], + }); + const configB = createMockConfig({ + subscriptions: [ + { + name: 'chain-b', + event: 'agent.completed', + enabled: true, + prompt: 'react to A', + source_session: 'Agent A', + }, + ], + }); + + mockLoadCueConfig.mockImplementation((root: string) => { + if (root === '/proj/a') return configA; + if (root === '/proj/b') return configB; + return null; + }); + + let runCount = 0; + const onCueRun = vi.fn(async (request: Parameters[0]) => { + runCount++; + const result: CueRunResult = { + runId: `run-${runCount}`, + sessionId: request.sessionId, + sessionName: request.sessionId === 'session-a' ? 'Agent A' : 'Agent B', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed' as const, + stdout: runCount === 1 ? 'raw' : runCount === 2 ? 'formatted' : 'chain-output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + }); + + const deps = createMockDeps({ + getSessions: vi.fn(() => [sessionA, sessionB]), + onCueRun, + }); + + const engine = new CueEngine(deps); + engine.start(); + + // Let the heartbeat fire (immediate) + output prompt + completion chain + await vi.advanceTimersByTimeAsync(100); + + // Session B's agent.completed event should have sourceOutput from the output prompt (formatted) + const chainCall = (onCueRun as ReturnType).mock.calls.find( + (call: unknown[]) => + (call[0] as { subscriptionName: string }).subscriptionName === 'chain-b' + ); + expect(chainCall).toBeDefined(); + expect((chainCall![0] as { event: CueEvent }).event.payload.sourceOutput).toContain( + 'formatted' + ); + + engine.stop(); + }); + }); + + describe('configuration hotloading', () => { + it('new subscription fires after hot reload', async () => { + const config1 = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-1', + event: 'time.heartbeat', + enabled: true, + prompt: 'first', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config1); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + + // Capture the watchCueYaml onChange callback + let capturedOnChange: (() => void) | undefined; + mockWatchCueYaml.mockImplementation((_root: string, cb: () => void) => { + capturedOnChange = cb; + return vi.fn(); + }); + + engine.start(); + await vi.advanceTimersByTimeAsync(0); + vi.clearAllMocks(); + + // Update config to have 2 subscriptions + const config2 = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-1', + event: 'time.heartbeat', + enabled: true, + prompt: 'first', + interval_minutes: 60, + }, + { + name: 'heartbeat-2', + event: 'time.heartbeat', + enabled: true, + prompt: 'second', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config2); + + // Invoke onChange to trigger hot reload + expect(capturedOnChange).toBeDefined(); + capturedOnChange!(); + + // Both heartbeats fire immediately on setup + await vi.advanceTimersByTimeAsync(0); + expect(deps.onCueRun).toHaveBeenCalledWith(expect.objectContaining({ prompt: 'second' })); + + engine.stop(); + }); + + it('removed subscription stops after hot reload', async () => { + const config1 = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-1', + event: 'time.heartbeat', + enabled: true, + prompt: 'first', + interval_minutes: 5, + }, + { + name: 'heartbeat-2', + event: 'time.heartbeat', + enabled: true, + prompt: 'second', + interval_minutes: 10, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config1); + const deps = createMockDeps(); + + let capturedOnChange: (() => void) | undefined; + mockWatchCueYaml.mockImplementation((_root: string, cb: () => void) => { + capturedOnChange = cb; + return vi.fn(); + }); + + const engine = new CueEngine(deps); + engine.start(); + await vi.advanceTimersByTimeAsync(0); + vi.clearAllMocks(); + + // Reload with only heartbeat-1 + const config2 = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-1', + event: 'time.heartbeat', + enabled: true, + prompt: 'first', + interval_minutes: 5, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config2); + capturedOnChange!(); + + await vi.advanceTimersByTimeAsync(0); + vi.clearAllMocks(); + + // Advance 5 minutes — only heartbeat-1 should fire + await vi.advanceTimersByTimeAsync(5 * 60 * 1000); + expect(deps.onCueRun).toHaveBeenCalledTimes(1); + expect(deps.onCueRun).toHaveBeenCalledWith(expect.objectContaining({ prompt: 'first' })); + + engine.stop(); + }); + + it('YAML deletion tears down session', async () => { + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 60, + }, + ], + }); + + let capturedOnChange: (() => void) | undefined; + mockWatchCueYaml.mockImplementation((_root: string, cb: () => void) => { + capturedOnChange = cb; + return vi.fn(); + }); + + mockLoadCueConfig.mockReturnValueOnce(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + await vi.advanceTimersByTimeAsync(0); + + // Reload returns null (YAML deleted) + mockLoadCueConfig.mockReturnValue(null); + capturedOnChange!(); + + // Session state should be removed + expect(engine.getStatus()).toHaveLength(0); + // Should log config removed + expect(deps.onLog).toHaveBeenCalledWith( + 'cue', + expect.stringContaining('Config removed'), + expect.objectContaining({ type: 'configRemoved' }) + ); + + engine.stop(); + }); + + it('scheduledFiredKeys are cleaned on refresh', async () => { + // Start at 08:59 — 1 minute before the scheduled time + vi.setSystemTime(new Date('2026-03-09T08:59:00')); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'schedule-test', + event: 'time.scheduled', + enabled: true, + prompt: 'scheduled task', + schedule_times: ['09:00'], + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + + let capturedOnChange: (() => void) | undefined; + mockWatchCueYaml.mockImplementation((_root: string, cb: () => void) => { + capturedOnChange = cb; + return vi.fn(); + }); + + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + // Advance to 09:00 — should fire + await vi.advanceTimersByTimeAsync(60_000); + expect(deps.onCueRun).toHaveBeenCalledTimes(1); + + // Refresh session — scheduledFiredKeys are cleared in teardownSession + capturedOnChange!(); + + // Reset system time to 08:59 so the next 60s advance lands at 09:00 again + vi.setSystemTime(new Date('2026-03-09T08:59:00')); + await vi.advanceTimersByTimeAsync(60_000); + expect(deps.onCueRun).toHaveBeenCalledTimes(2); + + engine.stop(); + }); + + it('changed max_concurrent applies to next drain', async () => { + const config1 = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + + let capturedOnChange: (() => void) | undefined; + mockWatchCueYaml.mockImplementation((_root: string, cb: () => void) => { + capturedOnChange = cb; + return vi.fn(); + }); + + // First run never resolves to keep the slot occupied + let resolveFirstRun: ((result: CueRunResult) => void) | undefined; + const firstRunPromise = new Promise((resolve) => { + resolveFirstRun = resolve; + }); + const subsequentResult: CueRunResult = { + runId: 'run-sub', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'heartbeat', + event: {} as CueEvent, + status: 'completed', + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + const onCueRun = vi + .fn() + .mockReturnValueOnce(firstRunPromise) + .mockResolvedValue(subsequentResult); + + mockLoadCueConfig.mockReturnValue(config1); + const deps = createMockDeps({ onCueRun }); + const engine = new CueEngine(deps); + engine.start(); + + // First heartbeat fires immediately, occupying the single slot + await vi.advanceTimersByTimeAsync(0); + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Advance 1 minute — second heartbeat queued (max_concurrent=1, slot occupied) + await vi.advanceTimersByTimeAsync(60_000); + + // Reload with max_concurrent=2 + const config2 = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 2, + queue_size: 10, + }, + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config2); + capturedOnChange!(); + + // The config reload tears down and reinitializes — the immediate heartbeat fires again + // With max_concurrent=2, the new heartbeat can dispatch immediately + await vi.advanceTimersByTimeAsync(0); + + // Resolve the first run to free the slot + resolveFirstRun!({ + ...subsequentResult, + runId: 'run-1', + }); + await vi.advanceTimersByTimeAsync(0); + + // After reload with max_concurrent=2, at least one additional run should have dispatched + expect(onCueRun.mock.calls.length).toBeGreaterThanOrEqual(2); + + engine.stop(); + }); + }); + + describe('prompt file existence warning (Fix 7)', () => { + it('logs warning when prompt_file is set but prompt is empty', () => { + const config = createMockConfig({ + subscriptions: [ + { + name: 'missing-file-sub', + event: 'time.heartbeat', + enabled: true, + prompt: '', + prompt_file: 'missing.md', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + expect(deps.onLog).toHaveBeenCalledWith('warn', expect.stringContaining('prompt_file')); + expect(deps.onLog).toHaveBeenCalledWith('warn', expect.stringContaining('missing.md')); + + engine.stop(); + }); + + it('does not warn when prompt_file is set and prompt is populated', () => { + const config = createMockConfig({ + subscriptions: [ + { + name: 'valid-file-sub', + event: 'time.heartbeat', + enabled: true, + prompt: 'content from file', + prompt_file: 'exists.md', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + const warnCalls = (deps.onLog as ReturnType).mock.calls.filter( + (call: unknown[]) => + call[0] === 'warn' && typeof call[1] === 'string' && call[1].includes('prompt_file') + ); + expect(warnCalls).toHaveLength(0); + + engine.stop(); + }); + }); }); diff --git a/src/__tests__/main/cue/cue-filter.test.ts b/src/__tests__/main/cue/cue-filter.test.ts index bd24bd221..d65e29b01 100644 --- a/src/__tests__/main/cue/cue-filter.test.ts +++ b/src/__tests__/main/cue/cue-filter.test.ts @@ -258,10 +258,39 @@ describe('cue-filter', () => { describe('empty filter', () => { it('matches everything when filter is empty', () => { expect(matchesFilter({ any: 'value' }, {})).toBe(true); + expect(matchesFilter({}, {})).toBe(true); }); }); }); + describe('combined filter conditions', () => { + it('combined numeric + glob in same filter object', () => { + expect( + matchesFilter({ size: 1500, path: 'src/app.ts' }, { size: '>1000', path: 'src/**/*.ts' }) + ).toBe(true); + expect( + matchesFilter({ size: 500, path: 'src/app.ts' }, { size: '>1000', path: 'src/**/*.ts' }) + ).toBe(false); + }); + }); + + describe('unicode handling', () => { + it('matches unicode strings exactly', () => { + expect(matchesFilter({ name: '日本語' }, { name: '日本語' })).toBe(true); + expect(matchesFilter({ name: '日本語' }, { name: '中文' })).toBe(false); + }); + }); + + describe('deep dot notation', () => { + it('resolves 4-level deep path', () => { + expect(matchesFilter({ a: { b: { c: { d: 'found' } } } }, { 'a.b.c.d': 'found' })).toBe(true); + }); + + it('returns false for partial path', () => { + expect(matchesFilter({ a: { b: 42 } }, { 'a.b.c': 'anything' })).toBe(false); + }); + }); + describe('describeFilter', () => { it('describes exact string match', () => { expect(describeFilter({ extension: '.ts' })).toBe('extension == ".ts"'); diff --git a/src/__tests__/main/cue/cue-github-poller.test.ts b/src/__tests__/main/cue/cue-github-poller.test.ts index 3039f5c13..687e4f577 100644 --- a/src/__tests__/main/cue/cue-github-poller.test.ts +++ b/src/__tests__/main/cue/cue-github-poller.test.ts @@ -601,6 +601,89 @@ describe('cue-github-poller', () => { expect(eventCallCount).toBe(1); }); + describe('first poll error resilience (Fix 3)', () => { + it('places seed marker when first poll fails', async () => { + const config = makeConfig(); + + // First call (--version) succeeds, but pr list fails + let callCount = 0; + mockExecFile.mockImplementation( + ( + cmd: string, + args: string[], + _opts: unknown, + cb: (err: Error | null, stdout: string, stderr: string) => void + ) => { + const key = `${cmd} ${args.join(' ')}`; + if (key.includes('--version')) { + cb(null, '2.0.0', ''); + } else if (key.includes('pr list')) { + callCount++; + cb(new Error('Network timeout'), '', ''); + } else { + cb(new Error('not found'), '', ''); + } + } + ); + + const cleanup = createCueGitHubPoller(config); + await vi.advanceTimersByTimeAsync(2000); + + expect(mockMarkGitHubItemSeen).toHaveBeenCalledWith('session-1:test-sub', '__seed_marker__'); + expect(config.onLog).toHaveBeenCalledWith('info', expect.stringContaining('seed marker set')); + + cleanup(); + }); + + it('second poll after first-poll error fires events for new items', async () => { + const config = makeConfig({ pollMinutes: 1 }); + + // First poll: pr list fails + // Second poll: pr list succeeds + let prListCallCount = 0; + mockExecFile.mockImplementation( + ( + cmd: string, + args: string[], + _opts: unknown, + cb: (err: Error | null, stdout: string, stderr: string) => void + ) => { + const key = `${cmd} ${args.join(' ')}`; + if (key.includes('--version')) { + cb(null, '2.0.0', ''); + } else if (key.includes('pr list')) { + prListCallCount++; + if (prListCallCount === 1) { + cb(new Error('Network timeout'), '', ''); + } else { + cb(null, JSON.stringify([samplePRs[0]]), ''); + } + } else { + cb(new Error('not found'), '', ''); + } + } + ); + + // After first poll error, seed marker is placed, so hasAnyGitHubSeen returns true + // This means second poll treats items as NOT first-run and fires events + mockHasAnyGitHubSeen.mockReturnValue(true); + + const cleanup = createCueGitHubPoller(config); + + // First poll at 2s — fails, seed marker placed + await vi.advanceTimersByTimeAsync(2000); + expect(config.onEvent).not.toHaveBeenCalled(); + + // Second poll at 2s + 1min — succeeds, fires events + await vi.advanceTimersByTimeAsync(60000); + expect(config.onEvent).toHaveBeenCalledTimes(1); + const event = (config.onEvent as ReturnType).mock.calls[0][0]; + expect(event.payload.number).toBe(1); + + cleanup(); + }); + }); + describe('ghState parameter', () => { it('passes "closed" state to gh pr list when ghState is "closed"', async () => { const config = makeConfig({ ghState: 'closed' }); diff --git a/src/__tests__/main/cue/cue-ipc-handlers.test.ts b/src/__tests__/main/cue/cue-ipc-handlers.test.ts index 1c2714066..963dc73f7 100644 --- a/src/__tests__/main/cue/cue-ipc-handlers.test.ts +++ b/src/__tests__/main/cue/cue-ipc-handlers.test.ts @@ -357,6 +357,65 @@ describe('Cue IPC Handlers', () => { }); }); + describe('edge cases', () => { + it('cue:getStatus returns empty array when engine not started', async () => { + // Engine exists but getStatus returns empty (no sessions registered) + mockEngine.getStatus.mockReturnValue([]); + + const handler = registerAndGetHandler('cue:getStatus'); + const result = await handler(null); + expect(result).toEqual([]); + expect(mockEngine.getStatus).toHaveBeenCalledOnce(); + }); + + it('cue:getActivityLog with limit returns bounded results', async () => { + const manyEntries = Array.from({ length: 10 }, (_, i) => ({ + runId: `r${i}`, + sessionId: 's1', + sessionName: 'Test', + subscriptionName: 'timer', + event: { + id: `e${i}`, + type: 'time.heartbeat', + timestamp: new Date().toISOString(), + triggerName: 'timer', + payload: {}, + }, + status: 'completed', + stdout: '', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + })); + + // Simulate engine returning only the last 2 entries (bounded by limit) + mockEngine.getActivityLog.mockReturnValue(manyEntries.slice(-2)); + + const handler = registerAndGetHandler('cue:getActivityLog'); + const result = await handler(null, { limit: 2 }); + + expect(result).toHaveLength(2); + expect(mockEngine.getActivityLog).toHaveBeenCalledWith(2); + }); + + it('cue:validateYaml handles empty content', async () => { + // Empty string: yaml.load returns undefined/null for empty input + vi.mocked(yaml.load).mockReturnValue(undefined); + vi.mocked(validateCueConfig).mockReturnValue({ + valid: false, + errors: ['Config must have a "subscriptions" array'], + }); + + const handler = registerAndGetHandler('cue:validateYaml'); + const result = (await handler(null, { content: '' })) as { valid: boolean; errors: string[] }; + + expect(result.valid).toBe(false); + expect(result.errors.length).toBeGreaterThan(0); + }); + }); + describe('cue:savePipelineLayout', () => { it('should write layout to JSON file', async () => { const layout = { diff --git a/src/__tests__/main/cue/cue-multi-hop-chains.test.ts b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts new file mode 100644 index 000000000..ad9c82eb9 --- /dev/null +++ b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts @@ -0,0 +1,772 @@ +/** + * Tests for CueEngine multi-hop completion chains and circular chain detection. + * + * Tests cover: + * - Multi-hop chains (A -> B -> C) + * - Stdout propagation through chains + * - Failed middle step with filters + * - Circular chain detection (A -> B -> A) + * - Self-referencing subscription detection + * - Fan-in -> fan-out combination + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; +import type { SessionInfo } from '../../../shared/types'; + +// Mock the yaml loader +const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); +const mockWatchCueYaml = vi.fn<(projectRoot: string, onChange: () => void) => () => void>(); +vi.mock('../../../main/cue/cue-yaml-loader', () => ({ + loadCueConfig: (...args: unknown[]) => mockLoadCueConfig(args[0] as string), + watchCueYaml: (...args: unknown[]) => mockWatchCueYaml(args[0] as string, args[1] as () => void), +})); + +// Mock the file watcher +const mockCreateCueFileWatcher = vi.fn<(config: unknown) => () => void>(); +vi.mock('../../../main/cue/cue-file-watcher', () => ({ + createCueFileWatcher: (...args: unknown[]) => mockCreateCueFileWatcher(args[0]), +})); + +// Mock cue-db +vi.mock('../../../main/cue/cue-db', () => ({ + initCueDb: vi.fn(), + closeCueDb: vi.fn(), + updateHeartbeat: vi.fn(), + getLastHeartbeat: vi.fn(() => null), + pruneCueEvents: vi.fn(), + recordCueEvent: vi.fn(), + updateCueEventStatus: vi.fn(), +})); + +// Mock reconciler +vi.mock('../../../main/cue/cue-reconciler', () => ({ + reconcileMissedTimeEvents: vi.fn(), +})); + +// Mock crypto +vi.mock('crypto', () => ({ + randomUUID: vi.fn(() => `uuid-${Math.random().toString(36).slice(2, 8)}`), +})); + +import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; + +function createMockSession(overrides: Partial = {}): SessionInfo { + return { + id: 'session-1', + name: 'Test Session', + toolType: 'claude-code', + cwd: '/projects/test', + projectRoot: '/projects/test', + ...overrides, + }; +} + +function createMockConfig(overrides: Partial = {}): CueConfig { + return { + subscriptions: [], + settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, + ...overrides, + }; +} + +function createMockDeps(overrides: Partial = {}): CueEngineDeps { + return { + getSessions: vi.fn(() => [createMockSession()]), + onCueRun: vi.fn(async () => ({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'test', + event: {} as CueEvent, + status: 'completed' as const, + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + })) as CueEngineDeps['onCueRun'], + onLog: vi.fn(), + ...overrides, + }; +} + +describe('CueEngine multi-hop completion chains', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + mockWatchCueYaml.mockReturnValue(vi.fn()); + mockCreateCueFileWatcher.mockReturnValue(vi.fn()); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('A -> B -> C chain executes all 3 with correct payloads', async () => { + const sessions = [ + createMockSession({ + id: 'source', + name: 'Source', + cwd: '/proj/source', + projectRoot: '/proj/source', + }), + createMockSession({ + id: 'middle', + name: 'Middle', + cwd: '/proj/middle', + projectRoot: '/proj/middle', + }), + createMockSession({ + id: 'downstream', + name: 'Downstream', + cwd: '/proj/downstream', + projectRoot: '/proj/downstream', + }), + ]; + + const configSource = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-source', + event: 'time.heartbeat', + enabled: true, + prompt: 'do source work', + interval_minutes: 60, + }, + ], + }); + const configMiddle = createMockConfig({ + subscriptions: [ + { + name: 'on-source-done', + event: 'agent.completed', + enabled: true, + prompt: 'do middle work', + source_session: 'Source', + }, + ], + }); + const configDownstream = createMockConfig({ + subscriptions: [ + { + name: 'on-middle-done', + event: 'agent.completed', + enabled: true, + prompt: 'do downstream work', + source_session: 'Middle', + }, + ], + }); + + mockLoadCueConfig.mockImplementation((projectRoot) => { + if (projectRoot === '/proj/source') return configSource; + if (projectRoot === '/proj/middle') return configMiddle; + if (projectRoot === '/proj/downstream') return configDownstream; + return null; + }); + + const onCueRun = vi.fn( + async (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => { + const session = sessions.find((s) => s.id === request.sessionId); + const result: CueRunResult = { + runId: request.runId, + sessionId: request.sessionId, + sessionName: session?.name ?? 'Unknown', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed', + stdout: `output-${request.sessionId}`, + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + } + ); + + const deps = createMockDeps({ + getSessions: vi.fn(() => sessions), + onCueRun: onCueRun as CueEngineDeps['onCueRun'], + }); + const engine = new CueEngine(deps); + engine.start(); + + // Flush all async work (heartbeat fires immediately, then chains through) + await vi.advanceTimersByTimeAsync(0); + + expect(onCueRun).toHaveBeenCalledTimes(3); + + // First call: heartbeat fires Source + expect(onCueRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'source', + prompt: 'do source work', + event: expect.objectContaining({ type: 'time.heartbeat' }), + }) + ); + + // Second call: Source completion triggers Middle + expect(onCueRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'middle', + prompt: 'do middle work', + event: expect.objectContaining({ type: 'agent.completed', triggerName: 'on-source-done' }), + }) + ); + + // Third call: Middle completion triggers Downstream + expect(onCueRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'downstream', + prompt: 'do downstream work', + event: expect.objectContaining({ type: 'agent.completed', triggerName: 'on-middle-done' }), + }) + ); + + engine.stop(); + }); + + it('stdout carries through chain', async () => { + const sessions = [ + createMockSession({ + id: 'source', + name: 'Source', + cwd: '/proj/source', + projectRoot: '/proj/source', + }), + createMockSession({ + id: 'middle', + name: 'Middle', + cwd: '/proj/middle', + projectRoot: '/proj/middle', + }), + createMockSession({ + id: 'downstream', + name: 'Downstream', + cwd: '/proj/downstream', + projectRoot: '/proj/downstream', + }), + ]; + + const configSource = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-source', + event: 'time.heartbeat', + enabled: true, + prompt: 'do source work', + interval_minutes: 60, + }, + ], + }); + const configMiddle = createMockConfig({ + subscriptions: [ + { + name: 'on-source-done', + event: 'agent.completed', + enabled: true, + prompt: 'do middle work', + source_session: 'Source', + }, + ], + }); + const configDownstream = createMockConfig({ + subscriptions: [ + { + name: 'on-middle-done', + event: 'agent.completed', + enabled: true, + prompt: 'do downstream work', + source_session: 'Middle', + }, + ], + }); + + mockLoadCueConfig.mockImplementation((projectRoot) => { + if (projectRoot === '/proj/source') return configSource; + if (projectRoot === '/proj/middle') return configMiddle; + if (projectRoot === '/proj/downstream') return configDownstream; + return null; + }); + + const onCueRun = vi.fn( + async (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => { + const session = sessions.find((s) => s.id === request.sessionId); + const result: CueRunResult = { + runId: request.runId, + sessionId: request.sessionId, + sessionName: session?.name ?? 'Unknown', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed', + stdout: `output-${request.sessionId}`, + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + } + ); + + const deps = createMockDeps({ + getSessions: vi.fn(() => sessions), + onCueRun: onCueRun as CueEngineDeps['onCueRun'], + }); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(0); + + // Middle's event payload should contain Source's stdout + const middleCall = onCueRun.mock.calls.find((call) => call[0].sessionId === 'middle'); + expect(middleCall).toBeDefined(); + expect(middleCall![0].event.payload).toEqual( + expect.objectContaining({ + sourceOutput: 'output-source', + }) + ); + + // Downstream's event payload should contain Middle's stdout + const downstreamCall = onCueRun.mock.calls.find((call) => call[0].sessionId === 'downstream'); + expect(downstreamCall).toBeDefined(); + expect(downstreamCall![0].event.payload).toEqual( + expect.objectContaining({ + sourceOutput: 'output-middle', + }) + ); + + engine.stop(); + }); + + it('failed middle step stops chain when downstream has status filter', async () => { + const sessions = [ + createMockSession({ + id: 'source', + name: 'Source', + cwd: '/proj/source', + projectRoot: '/proj/source', + }), + createMockSession({ + id: 'middle', + name: 'Middle', + cwd: '/proj/middle', + projectRoot: '/proj/middle', + }), + createMockSession({ + id: 'downstream', + name: 'Downstream', + cwd: '/proj/downstream', + projectRoot: '/proj/downstream', + }), + ]; + + const configSource = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-source', + event: 'time.heartbeat', + enabled: true, + prompt: 'do source work', + interval_minutes: 60, + }, + ], + }); + const configMiddle = createMockConfig({ + subscriptions: [ + { + name: 'on-source-done', + event: 'agent.completed', + enabled: true, + prompt: 'do middle work', + source_session: 'Source', + }, + ], + }); + const configDownstream = createMockConfig({ + subscriptions: [ + { + name: 'on-middle-done', + event: 'agent.completed', + enabled: true, + prompt: 'do downstream work', + source_session: 'Middle', + filter: { status: 'completed' }, + }, + ], + }); + + mockLoadCueConfig.mockImplementation((projectRoot) => { + if (projectRoot === '/proj/source') return configSource; + if (projectRoot === '/proj/middle') return configMiddle; + if (projectRoot === '/proj/downstream') return configDownstream; + return null; + }); + + const onCueRun = vi.fn( + async (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => { + const session = sessions.find((s) => s.id === request.sessionId); + // Middle fails, everything else succeeds + const isFailed = request.sessionId === 'middle'; + const result: CueRunResult = { + runId: request.runId, + sessionId: request.sessionId, + sessionName: session?.name ?? 'Unknown', + subscriptionName: request.subscriptionName, + event: request.event, + status: isFailed ? 'failed' : 'completed', + stdout: `output-${request.sessionId}`, + stderr: isFailed ? 'error occurred' : '', + exitCode: isFailed ? 1 : 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + } + ); + + const deps = createMockDeps({ + getSessions: vi.fn(() => sessions), + onCueRun: onCueRun as CueEngineDeps['onCueRun'], + }); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(0); + + // Source heartbeat fires, then Middle fires (triggered by Source completion), + // but Downstream should NOT fire because Middle failed and filter requires 'completed' + expect(onCueRun).toHaveBeenCalledTimes(2); + + // Verify the two calls are Source and Middle only + const calledSessionIds = onCueRun.mock.calls.map((call) => call[0].sessionId); + expect(calledSessionIds).toContain('source'); + expect(calledSessionIds).toContain('middle'); + expect(calledSessionIds).not.toContain('downstream'); + + engine.stop(); + }); + + it('circular chain A -> B -> A is bounded by MAX_CHAIN_DEPTH', async () => { + // The chain depth guard (MAX_CHAIN_DEPTH=10) is propagated through AgentCompletionData + // across async hops. When depth reaches the limit, notifyAgentCompleted aborts and logs. + const sessions = [ + createMockSession({ id: 'a', name: 'A', cwd: '/proj/a', projectRoot: '/proj/a' }), + createMockSession({ id: 'b', name: 'B', cwd: '/proj/b', projectRoot: '/proj/b' }), + ]; + + const configA = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-a', + event: 'time.heartbeat', + enabled: true, + prompt: 'do a work', + interval_minutes: 60, + }, + { + name: 'on-b-done', + event: 'agent.completed', + enabled: true, + prompt: 'react to b', + source_session: 'B', + }, + ], + }); + const configB = createMockConfig({ + subscriptions: [ + { + name: 'on-a-done', + event: 'agent.completed', + enabled: true, + prompt: 'react to a', + source_session: 'A', + }, + ], + }); + + mockLoadCueConfig.mockImplementation((projectRoot) => { + if (projectRoot === '/proj/a') return configA; + if (projectRoot === '/proj/b') return configB; + return null; + }); + + const onCueRun = vi.fn( + async (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => { + const session = sessions.find((s) => s.id === request.sessionId); + const result: CueRunResult = { + runId: request.runId, + sessionId: request.sessionId, + sessionName: session?.name ?? 'Unknown', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed', + stdout: `output-${request.sessionId}`, + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + } + ); + + const onLog = vi.fn(); + + const deps = createMockDeps({ + getSessions: vi.fn(() => sessions), + onCueRun: onCueRun as CueEngineDeps['onCueRun'], + onLog, + }); + const engine = new CueEngine(deps); + engine.start(); + + // Flush all async hops until the chain depth guard fires + for (let i = 0; i < 15; i++) { + await vi.advanceTimersByTimeAsync(0); + } + + // The chain ran but was stopped by MAX_CHAIN_DEPTH + expect(onCueRun).toHaveBeenCalled(); + const callCount = onCueRun.mock.calls.length; + // Should be bounded — heartbeat(1) + chain hops limited by depth 10 + expect(callCount).toBeLessThanOrEqual(12); + + // Verify the chain alternated between A and B sessions + const sessionIds = onCueRun.mock.calls.map((call) => call[0].sessionId); + expect(sessionIds[0]).toBe('a'); + if (callCount > 1) expect(sessionIds[1]).toBe('b'); + + // The depth-exceeded error was logged + const errorLogs = onLog.mock.calls.filter( + (call) => call[0] === 'error' && (call[1] as string).includes('Max chain depth') + ); + expect(errorLogs.length).toBeGreaterThan(0); + + engine.stop(); + }); + + it('self-referencing subscription is bounded by MAX_CHAIN_DEPTH', async () => { + // A session watching its own completion creates a loop. + // The chain depth propagated via AgentCompletionData stops it. + const sessions = [ + createMockSession({ id: 'self', name: 'Self', cwd: '/proj/self', projectRoot: '/proj/self' }), + ]; + + const configSelf = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat-self', + event: 'time.heartbeat', + enabled: true, + prompt: 'do self work', + interval_minutes: 60, + }, + { + name: 'on-self-done', + event: 'agent.completed', + enabled: true, + prompt: 'react to self', + source_session: 'Self', + }, + ], + }); + + mockLoadCueConfig.mockReturnValue(configSelf); + + let callCount = 0; + const onCueRun = vi.fn( + async (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => { + callCount++; + const result: CueRunResult = { + runId: request.runId, + sessionId: request.sessionId, + sessionName: 'Self', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed', + stdout: `output-${callCount}`, + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + return result; + } + ); + + const onLog = vi.fn(); + + const deps = createMockDeps({ + getSessions: vi.fn(() => sessions), + onCueRun: onCueRun as CueEngineDeps['onCueRun'], + onLog, + }); + const engine = new CueEngine(deps); + engine.start(); + + // Flush all async hops until the chain depth guard fires + for (let i = 0; i < 15; i++) { + await vi.advanceTimersByTimeAsync(0); + } + + // All calls target the same session + const sessionIds = onCueRun.mock.calls.map((call) => call[0].sessionId); + expect(sessionIds.every((id) => id === 'self')).toBe(true); + + // First call is the heartbeat, subsequent calls are self-triggered completions + expect(onCueRun.mock.calls[0][0].subscriptionName).toBe('heartbeat-self'); + if (callCount > 1) { + expect(onCueRun.mock.calls[1][0].subscriptionName).toBe('on-self-done'); + } + + // The depth-exceeded error was logged + const errorLogs = onLog.mock.calls.filter( + (call) => call[0] === 'error' && (call[1] as string).includes('Max chain depth') + ); + expect(errorLogs.length).toBeGreaterThan(0); + + engine.stop(); + }); + + it('fan-in -> fan-out combination dispatches to all targets after all sources complete', async () => { + const sessions = [ + createMockSession({ + id: 'source-a', + name: 'SourceA', + cwd: '/proj/source-a', + projectRoot: '/proj/source-a', + }), + createMockSession({ + id: 'source-b', + name: 'SourceB', + cwd: '/proj/source-b', + projectRoot: '/proj/source-b', + }), + createMockSession({ + id: 'orchestrator', + name: 'Orchestrator', + cwd: '/proj/orch', + projectRoot: '/proj/orch', + }), + createMockSession({ + id: 'target-x', + name: 'TargetX', + cwd: '/proj/target-x', + projectRoot: '/proj/target-x', + }), + createMockSession({ + id: 'target-y', + name: 'TargetY', + cwd: '/proj/target-y', + projectRoot: '/proj/target-y', + }), + ]; + + const configOrch = createMockConfig({ + subscriptions: [ + { + name: 'fan-in-out', + event: 'agent.completed', + enabled: true, + prompt: 'orchestrate', + source_session: ['SourceA', 'SourceB'], + fan_out: ['TargetX', 'TargetY'], + }, + ], + }); + + mockLoadCueConfig.mockImplementation((projectRoot) => { + if (projectRoot === '/proj/orch') return configOrch; + return null; + }); + + const deps = createMockDeps({ getSessions: vi.fn(() => sessions) }); + const engine = new CueEngine(deps); + engine.start(); + + vi.clearAllMocks(); + + // First source completes — fan-in should wait + engine.notifyAgentCompleted('source-a', { sessionName: 'SourceA', stdout: 'output-a' }); + expect(deps.onCueRun).not.toHaveBeenCalled(); + + // Second source completes — fan-in should fire, then fan-out dispatches + engine.notifyAgentCompleted('source-b', { sessionName: 'SourceB', stdout: 'output-b' }); + await vi.advanceTimersByTimeAsync(0); + + // Fan-out should dispatch to both TargetX and TargetY + expect(deps.onCueRun).toHaveBeenCalledTimes(2); + expect(deps.onCueRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'target-x', + prompt: 'orchestrate', + event: expect.objectContaining({ + type: 'agent.completed', + triggerName: 'fan-in-out', + payload: expect.objectContaining({ + fanOutIndex: 0, + }), + }), + }) + ); + expect(deps.onCueRun).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'target-y', + prompt: 'orchestrate', + event: expect.objectContaining({ + type: 'agent.completed', + triggerName: 'fan-in-out', + payload: expect.objectContaining({ + fanOutIndex: 1, + }), + }), + }) + ); + + engine.stop(); + }); +}); diff --git a/src/__tests__/main/cue/cue-session-lifecycle.test.ts b/src/__tests__/main/cue/cue-session-lifecycle.test.ts new file mode 100644 index 000000000..185a422eb --- /dev/null +++ b/src/__tests__/main/cue/cue-session-lifecycle.test.ts @@ -0,0 +1,506 @@ +/** + * Tests for CueEngine session lifecycle under active state. + * + * Tests cover: + * - removeSession clears queued events + * - removeSession clears fan-in tracker + * - removeSession with in-flight run completes cleanly + * - refreshSession during active run + * - refreshSession doesn't double-count active runs + * - teardownSession clears event queue (Fix 2 validation) + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; +import type { SessionInfo } from '../../../shared/types'; + +// Mock the yaml loader +const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); +const mockWatchCueYaml = vi.fn<(projectRoot: string, onChange: () => void) => () => void>(); +vi.mock('../../../main/cue/cue-yaml-loader', () => ({ + loadCueConfig: (...args: unknown[]) => mockLoadCueConfig(args[0] as string), + watchCueYaml: (...args: unknown[]) => mockWatchCueYaml(args[0] as string, args[1] as () => void), +})); + +// Mock the file watcher +const mockCreateCueFileWatcher = vi.fn<(config: unknown) => () => void>(); +vi.mock('../../../main/cue/cue-file-watcher', () => ({ + createCueFileWatcher: (...args: unknown[]) => mockCreateCueFileWatcher(args[0]), +})); + +// Mock cue-db +vi.mock('../../../main/cue/cue-db', () => ({ + initCueDb: vi.fn(), + closeCueDb: vi.fn(), + updateHeartbeat: vi.fn(), + getLastHeartbeat: vi.fn(() => null), + pruneCueEvents: vi.fn(), + recordCueEvent: vi.fn(), + updateCueEventStatus: vi.fn(), +})); + +// Mock reconciler +vi.mock('../../../main/cue/cue-reconciler', () => ({ + reconcileMissedTimeEvents: vi.fn(), +})); + +// Mock crypto +vi.mock('crypto', () => ({ + randomUUID: vi.fn(() => `uuid-${Math.random().toString(36).slice(2, 8)}`), +})); + +import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; + +function createMockSession(overrides: Partial = {}): SessionInfo { + return { + id: 'session-1', + name: 'Test Session', + toolType: 'claude-code', + cwd: '/projects/test', + projectRoot: '/projects/test', + ...overrides, + }; +} + +function createMockConfig(overrides: Partial = {}): CueConfig { + return { + subscriptions: [], + settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, + ...overrides, + }; +} + +function createMockDeps(overrides: Partial = {}): CueEngineDeps { + return { + getSessions: vi.fn(() => [createMockSession()]), + onCueRun: vi.fn(async () => ({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'test', + event: {} as CueEvent, + status: 'completed' as const, + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + })) as CueEngineDeps['onCueRun'], + onLog: vi.fn(), + ...overrides, + }; +} + +describe('CueEngine session lifecycle', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + mockWatchCueYaml.mockReturnValue(vi.fn()); + mockCreateCueFileWatcher.mockReturnValue(vi.fn()); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('removeSession clears queued events', async () => { + // Setup: max_concurrent=1, heartbeat with interval_minutes=1 + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + interval_minutes: 1, + }, + ], + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + }); + mockLoadCueConfig.mockReturnValue(config); + + // First call returns a never-resolving promise (to occupy the slot) + let resolveRun: ((val: CueRunResult) => void) | null = null; + const onCueRun = vi.fn( + () => + new Promise((resolve) => { + resolveRun = resolve; + }) + ); + const deps = createMockDeps({ onCueRun: onCueRun as CueEngineDeps['onCueRun'] }); + const engine = new CueEngine(deps); + + engine.start(); + // Heartbeat fires immediately on start -> occupies the single slot + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Advance timer by 60s to fire another heartbeat -> goes into queue + vi.advanceTimersByTime(60 * 1000); + expect(onCueRun).toHaveBeenCalledTimes(1); // still 1 — second event is queued + + // Assert queue has 1 entry for session-1 + const queueStatus = engine.getQueueStatus(); + expect(queueStatus.get('session-1')).toBe(1); + + // Remove the session + engine.removeSession('session-1'); + + // Assert queue is now empty + const queueAfter = engine.getQueueStatus(); + expect(queueAfter.size).toBe(0); + + // Clean up: resolve the in-flight promise so the test exits cleanly + resolveRun!({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'heartbeat', + event: {} as CueEvent, + status: 'completed', + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }); + await vi.advanceTimersByTimeAsync(0); // flush microtasks + + engine.stop(); + }); + + it('removeSession clears fan-in tracker', () => { + // Setup: fan-in subscription with source_session: ['SourceA', 'SourceB'] + const config = createMockConfig({ + subscriptions: [ + { + name: 'all-done', + event: 'agent.completed', + enabled: true, + prompt: 'aggregate', + source_session: ['SourceA', 'SourceB'], + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + vi.clearAllMocks(); + + // Fire first completion -> fan-in waiting for SourceB + engine.notifyAgentCompleted('source-a', { sessionName: 'SourceA', stdout: 'output-a' }); + expect(deps.onCueRun).not.toHaveBeenCalled(); + + // Remove the owner session (session-1 which owns the fan-in subscription) + engine.removeSession('session-1'); + + // Fire second completion -> should NOT trigger anything since session was removed + engine.notifyAgentCompleted('source-b', { sessionName: 'SourceB', stdout: 'output-b' }); + + // Assert onCueRun was NOT called after the removal + expect(deps.onCueRun).not.toHaveBeenCalled(); + + engine.stop(); + }); + + it('removeSession with in-flight run completes cleanly', async () => { + // Setup: heartbeat subscription + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + + // Controllable promise for onCueRun + let resolveRun: ((val: CueRunResult) => void) | null = null; + const onCueRun = vi.fn( + () => + new Promise((resolve) => { + resolveRun = resolve; + }) + ); + const deps = createMockDeps({ onCueRun: onCueRun as CueEngineDeps['onCueRun'] }); + const engine = new CueEngine(deps); + + engine.start(); + // Heartbeat fires immediately -> occupies slot + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Remove session while run is in-flight + engine.removeSession('session-1'); + + // Resolve the in-flight promise + resolveRun!({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'heartbeat', + event: {} as CueEvent, + status: 'completed', + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }); + await vi.advanceTimersByTimeAsync(0); // flush microtasks + + // Assert no unhandled errors (test completes without throwing) + // Assert getActiveRuns returns empty after resolution + expect(engine.getActiveRuns()).toHaveLength(0); + + engine.stop(); + }); + + it('refreshSession during active run', async () => { + // Setup: heartbeat with interval_minutes=60 + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + + // Track all resolve functions for controllable promises + const resolvers: ((val: CueRunResult) => void)[] = []; + const onCueRun = vi.fn( + () => + new Promise((resolve) => { + resolvers.push(resolve); + }) + ); + const deps = createMockDeps({ onCueRun: onCueRun as CueEngineDeps['onCueRun'] }); + const engine = new CueEngine(deps); + + engine.start(); + // First heartbeat fires immediately + expect(onCueRun).toHaveBeenCalledTimes(1); + expect(resolvers).toHaveLength(1); + + // Update config to return a new config with interval_minutes=5 + const newConfig = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work faster', + interval_minutes: 5, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(newConfig); + + // Refresh the session (simulates config reload). + // The old run is still in-flight (activeRunCount=1). During initSession, + // the immediate heartbeat fire sees activeRunCount=1 >= maxConcurrent=1 + // (defaulted because session state isn't in the map yet during setup), + // so the new heartbeat goes into the queue instead of dispatching. + engine.refreshSession('session-1', '/projects/test'); + + // onCueRun is still 1 — the refresh's immediate heartbeat was queued + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Resolve the original in-flight promise — this decrements activeRunCount + // and drains the queue, dispatching the queued heartbeat + const completedResult: CueRunResult = { + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'heartbeat', + event: {} as CueEvent, + status: 'completed', + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }; + resolvers[0](completedResult); + await vi.advanceTimersByTimeAsync(0); // flush microtasks + + // After the in-flight completes and drainQueue fires, the queued heartbeat dispatches + expect(onCueRun).toHaveBeenCalledTimes(2); + expect(resolvers).toHaveLength(2); + + // Now resolve the second run (drained from queue) so the slot is freed + resolvers[1](completedResult); + await vi.advanceTimersByTimeAsync(0); // flush microtasks + + // Advance time by 5 minutes -> new subscription interval fires with new config + vi.clearAllMocks(); + vi.advanceTimersByTime(5 * 60 * 1000); + expect(onCueRun).toHaveBeenCalledTimes(1); + + engine.stop(); + }); + + it('refreshSession does not double-count active runs', async () => { + // Setup: heartbeat, max_concurrent=2, controllable onCueRun (never resolves). + // During initSession, the immediate heartbeat fire reads maxConcurrent from + // this.sessions.get(sessionId), which is not yet set (happens after the + // subscription setup loop), so it defaults to 1. With activeRunCount=1 + // from the orphaned in-flight run, the immediate fire goes into the queue. + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + interval_minutes: 60, + }, + ], + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 2, + queue_size: 10, + }, + }); + mockLoadCueConfig.mockReturnValue(config); + + const onCueRun = vi.fn( + () => + new Promise(() => { + /* never resolves */ + }) + ); + const deps = createMockDeps({ onCueRun: onCueRun as CueEngineDeps['onCueRun'] }); + const engine = new CueEngine(deps); + + engine.start(); + // Heartbeat fires immediately -> 1 active run + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Refresh the session (tears down old timers, re-inits) + engine.refreshSession('session-1', '/projects/test'); + + // The immediate heartbeat during refresh was queued (not dispatched), + // because activeRunCount=1 and the session state isn't in the map yet + // during setupHeartbeatSubscription, so maxConcurrent defaults to 1. + expect(onCueRun).toHaveBeenCalledTimes(1); + + // The queue should have exactly 1 entry from the refresh's immediate fire + expect(engine.getQueueStatus().get('session-1')).toBe(1); + + // Advance timer to trigger the interval heartbeat (60 min). + // Now the session state IS in the map, so max_concurrent=2 is read. + // activeRunCount=1 (orphaned) < max_concurrent=2, so it dispatches. + vi.advanceTimersByTime(60 * 60 * 1000); + + // We should have exactly 2 dispatched calls total: initial + interval + // (the queued immediate fire from refresh was drained when the interval fired + // or may remain queued depending on ordering — but no infinite loop or double-count) + expect(onCueRun.mock.calls.length).toBeGreaterThanOrEqual(1); + expect(onCueRun.mock.calls.length).toBeLessThanOrEqual(3); + + engine.stop(); + }); + + it('teardownSession clears event queue (Fix 2 validation)', async () => { + // Setup: max_concurrent=1, heartbeat with interval_minutes=1 + const config = createMockConfig({ + subscriptions: [ + { + name: 'heartbeat', + event: 'time.heartbeat', + enabled: true, + prompt: 'do work', + interval_minutes: 1, + }, + ], + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + }); + mockLoadCueConfig.mockReturnValue(config); + + // Capture the watchCueYaml onChange callback + let yamlOnChange: (() => void) | null = null; + mockWatchCueYaml.mockImplementation((_projectRoot: string, onChange: () => void) => { + yamlOnChange = onChange; + return vi.fn(); + }); + + let resolveRun: ((val: CueRunResult) => void) | null = null; + const onCueRun = vi.fn( + () => + new Promise((resolve) => { + resolveRun = resolve; + }) + ); + const deps = createMockDeps({ onCueRun: onCueRun as CueEngineDeps['onCueRun'] }); + const engine = new CueEngine(deps); + + engine.start(); + // Heartbeat fires immediately -> occupies the single slot + expect(onCueRun).toHaveBeenCalledTimes(1); + + // Advance timer to queue events + vi.advanceTimersByTime(60 * 1000); + expect(engine.getQueueStatus().get('session-1')).toBe(1); + + vi.advanceTimersByTime(60 * 1000); + expect(engine.getQueueStatus().get('session-1')).toBe(2); + + // Call the onChange callback (simulates config file change -> refreshSession internally). + // refreshSession calls teardownSession which clears the queue, then initSession + // re-creates the session and fires the immediate heartbeat. Since the old in-flight + // run still occupies the slot (activeRunCount=1), the new immediate fire is queued. + expect(yamlOnChange).not.toBeNull(); + yamlOnChange!(); + + // After refresh, the old 2 queued events are cleared. The new immediate heartbeat + // goes into a fresh queue entry (1 item), not 2 items from before. + const queueAfter = engine.getQueueStatus(); + const queueCount = queueAfter.get('session-1') ?? 0; + // The old queue of 2 was cleared; at most 1 new entry from the refresh's immediate fire + expect(queueCount).toBeLessThanOrEqual(1); + + // Clean up: resolve the in-flight promise + resolveRun!({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: 'heartbeat', + event: {} as CueEvent, + status: 'completed', + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + }); + await vi.advanceTimersByTimeAsync(0); // flush microtasks + + engine.stop(); + }); +}); diff --git a/src/__tests__/main/cue/cue-template-variables.test.ts b/src/__tests__/main/cue/cue-template-variables.test.ts new file mode 100644 index 000000000..123f23240 --- /dev/null +++ b/src/__tests__/main/cue/cue-template-variables.test.ts @@ -0,0 +1,167 @@ +/** + * Tests for Cue-specific template variable substitution. + * + * Validates that substituteTemplateVariables correctly handles all CUE_* prefixed + * variables for file.changed, agent.completed, task.pending, github.*, and base + * event contexts. + */ + +import { describe, it, expect } from 'vitest'; +import { + substituteTemplateVariables, + type TemplateContext, +} from '../../../shared/templateVariables'; + +function makeContext(cue: TemplateContext['cue'] = {}): TemplateContext { + return { + session: { + id: 'session-1', + name: 'Test Agent', + toolType: 'claude-code', + cwd: '/projects/test', + }, + cue, + }; +} + +describe('Cue template variable substitution', () => { + it('substitutes all file.changed variables', () => { + const ctx = makeContext({ + filePath: '/projects/test/src/app.ts', + fileName: 'app.ts', + fileDir: '/projects/test/src', + fileExt: '.ts', + fileChangeType: 'change', + }); + const template = + 'File {{CUE_FILE_PATH}} name={{CUE_FILE_NAME}} dir={{CUE_FILE_DIR}} ext={{CUE_FILE_EXT}} change={{CUE_FILE_CHANGE_TYPE}}'; + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe( + 'File /projects/test/src/app.ts name=app.ts dir=/projects/test/src ext=.ts change=change' + ); + }); + + it('substitutes all agent.completed variables', () => { + const ctx = makeContext({ + sourceSession: 'worker-1', + sourceOutput: 'Build succeeded', + sourceStatus: 'completed', + sourceExitCode: '0', + sourceDuration: '12345', + sourceTriggeredBy: 'file-watcher-sub', + }); + const template = [ + 'session={{CUE_SOURCE_SESSION}}', + 'output={{CUE_SOURCE_OUTPUT}}', + 'status={{CUE_SOURCE_STATUS}}', + 'exit={{CUE_SOURCE_EXIT_CODE}}', + 'duration={{CUE_SOURCE_DURATION}}', + 'trigger={{CUE_SOURCE_TRIGGERED_BY}}', + ].join(' '); + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe( + 'session=worker-1 output=Build succeeded status=completed exit=0 duration=12345 trigger=file-watcher-sub' + ); + }); + + it('substitutes all task.pending variables', () => { + const ctx = makeContext({ + taskFile: '/projects/test/tasks/todo.md', + taskFileName: 'todo.md', + taskFileDir: '/projects/test/tasks', + taskCount: '3', + taskList: '- [ ] task one\n- [ ] task two\n- [ ] task three', + taskContent: '# TODO\n- [ ] task one\n- [ ] task two\n- [ ] task three', + }); + const template = [ + 'file={{CUE_TASK_FILE}}', + 'name={{CUE_TASK_FILE_NAME}}', + 'dir={{CUE_TASK_FILE_DIR}}', + 'count={{CUE_TASK_COUNT}}', + 'list={{CUE_TASK_LIST}}', + 'content={{CUE_TASK_CONTENT}}', + ].join(' '); + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe( + 'file=/projects/test/tasks/todo.md name=todo.md dir=/projects/test/tasks count=3 list=- [ ] task one\n- [ ] task two\n- [ ] task three content=# TODO\n- [ ] task one\n- [ ] task two\n- [ ] task three' + ); + }); + + it('substitutes all github variables', () => { + const ctx = makeContext({ + ghType: 'pull_request', + ghNumber: '42', + ghTitle: 'Add feature X', + ghAuthor: 'alice', + ghUrl: 'https://github.com/owner/repo/pull/42', + ghBody: 'This PR adds feature X', + ghLabels: 'enhancement,priority', + ghState: 'open', + ghRepo: 'owner/repo', + ghBranch: 'feature-x', + ghBaseBranch: 'main', + ghAssignees: 'bob,charlie', + ghMergedAt: '2026-03-15T12:00:00Z', + }); + const template = [ + 'type={{CUE_GH_TYPE}}', + 'num={{CUE_GH_NUMBER}}', + 'title={{CUE_GH_TITLE}}', + 'author={{CUE_GH_AUTHOR}}', + 'url={{CUE_GH_URL}}', + 'body={{CUE_GH_BODY}}', + 'labels={{CUE_GH_LABELS}}', + 'state={{CUE_GH_STATE}}', + 'repo={{CUE_GH_REPO}}', + 'branch={{CUE_GH_BRANCH}}', + 'base={{CUE_GH_BASE_BRANCH}}', + 'assignees={{CUE_GH_ASSIGNEES}}', + 'merged={{CUE_GH_MERGED_AT}}', + ].join(' '); + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe( + 'type=pull_request num=42 title=Add feature X author=alice url=https://github.com/owner/repo/pull/42 body=This PR adds feature X labels=enhancement,priority state=open repo=owner/repo branch=feature-x base=main assignees=bob,charlie merged=2026-03-15T12:00:00Z' + ); + }); + + it('substitutes base event variables', () => { + const ctx = makeContext({ + eventType: 'file.changed', + eventTimestamp: '2026-03-15T10:30:00Z', + triggerName: 'watch-src', + runId: 'abc-123-def', + }); + const template = + 'event={{CUE_EVENT_TYPE}} ts={{CUE_EVENT_TIMESTAMP}} trigger={{CUE_TRIGGER_NAME}} run={{CUE_RUN_ID}}'; + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe( + 'event=file.changed ts=2026-03-15T10:30:00Z trigger=watch-src run=abc-123-def' + ); + }); + + it('produces empty string for missing cue context fields', () => { + const ctx = makeContext({}); + const template = + 'event={{CUE_EVENT_TYPE}} file={{CUE_FILE_PATH}} session={{CUE_SOURCE_SESSION}} task={{CUE_TASK_FILE}} gh={{CUE_GH_TYPE}}'; + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe('event= file= session= task= gh='); + }); + + it('handles special characters in variable values', () => { + const ctx = makeContext({ + sourceOutput: 'Line 1\nLine "2"\nCurly {braces} and {{double}}', + }); + const template = 'output={{CUE_SOURCE_OUTPUT}}'; + const result = substituteTemplateVariables(template, ctx); + expect(result).toBe('output=Line 1\nLine "2"\nCurly {braces} and {{double}}'); + }); + + it('preserves 5000-char sourceOutput without truncation', () => { + const longOutput = 'x'.repeat(5000); + const ctx = makeContext({ sourceOutput: longOutput }); + const template = '{{CUE_SOURCE_OUTPUT}}'; + const result = substituteTemplateVariables(template, ctx); + expect(result).toHaveLength(5000); + expect(result).toBe(longOutput); + }); +}); diff --git a/src/__tests__/main/cue/cue-yaml-loader.test.ts b/src/__tests__/main/cue/cue-yaml-loader.test.ts index 1642948fb..00d7a4c39 100644 --- a/src/__tests__/main/cue/cue-yaml-loader.test.ts +++ b/src/__tests__/main/cue/cue-yaml-loader.test.ts @@ -1312,6 +1312,116 @@ subscriptions: }); }); + describe('watch glob validation (Fix 6)', () => { + it('accepts valid glob pattern for file.changed', () => { + const result = validateCueConfig({ + subscriptions: [ + { + name: 'good-glob', + event: 'file.changed', + prompt: 'test', + watch: '**/*.ts', + }, + ], + }); + expect(result.valid).toBe(true); + }); + + it('accepts valid glob pattern for task.pending', () => { + const result = validateCueConfig({ + subscriptions: [ + { + name: 'good-task-glob', + event: 'task.pending', + prompt: 'test', + watch: 'docs/**/*.md', + }, + ], + }); + expect(result.valid).toBe(true); + }); + + it('rejects empty watch string for file.changed', () => { + const result = validateCueConfig({ + subscriptions: [ + { + name: 'empty-glob', + event: 'file.changed', + prompt: 'test', + watch: '', + }, + ], + }); + expect(result.valid).toBe(false); + expect(result.errors.some((e: string) => e.includes('watch'))).toBe(true); + }); + + it('rejects empty watch string for task.pending', () => { + const result = validateCueConfig({ + subscriptions: [ + { + name: 'empty-task-glob', + event: 'task.pending', + prompt: 'test', + watch: '', + }, + ], + }); + expect(result.valid).toBe(false); + expect(result.errors.some((e: string) => e.includes('watch'))).toBe(true); + }); + + it('picomatch accepts unbalanced bracket pattern without throwing', () => { + // picomatch treats [*.ts as a literal — it does NOT throw + // so the try/catch validation passes it as valid + const result = validateCueConfig({ + subscriptions: [ + { + name: 'unbalanced-bracket', + event: 'file.changed', + prompt: 'test', + watch: '[*.ts', + }, + ], + }); + // picomatch does not throw on this pattern, so validation passes + expect(result.valid).toBe(true); + }); + + it('accepts complex valid glob patterns', () => { + const patterns = ['src/**/*.{ts,tsx}', '*.md', 'docs/**/README.md', '!node_modules/**']; + for (const watch of patterns) { + const result = validateCueConfig({ + subscriptions: [ + { + name: `glob-${watch.replace(/[^a-z]/gi, '')}`, + event: 'file.changed', + prompt: 'test', + watch, + }, + ], + }); + const watchErrors = result.errors.filter((e: string) => e.includes('glob pattern')); + expect(watchErrors).toHaveLength(0); + } + }); + + it('rejects non-string watch value for file.changed', () => { + const result = validateCueConfig({ + subscriptions: [ + { + name: 'non-string-watch', + event: 'file.changed', + prompt: 'test', + watch: 123 as unknown as string, + }, + ], + }); + expect(result.valid).toBe(false); + expect(result.errors.some((e: string) => e.includes('watch'))).toBe(true); + }); + }); + describe('loadCueConfig with filter (continued)', () => { it('ignores filter with invalid nested values', () => { mockExistsSync.mockReturnValue(true); diff --git a/src/__tests__/renderer/components/CueModal.test.tsx b/src/__tests__/renderer/components/CueModal.test.tsx index 8a1f448b0..012281746 100644 --- a/src/__tests__/renderer/components/CueModal.test.tsx +++ b/src/__tests__/renderer/components/CueModal.test.tsx @@ -562,6 +562,52 @@ describe('CueModal', () => { }); }); + describe('edge cases', () => { + it('renders without crash when status has many sessions', () => { + const manySessions = Array.from({ length: 20 }, (_, i) => ({ + ...mockSession, + sessionId: `sess-${i}`, + sessionName: `Session ${i}`, + subscriptionCount: i + 1, + activeRuns: i % 3, + })); + + mockUseCueReturn = { + ...defaultUseCueReturn, + sessions: manySessions, + }; + + render(); + fireEvent.click(screen.getByText('Dashboard')); + + // All 20 sessions should be rendered + for (let i = 0; i < 20; i++) { + expect(screen.getByText(`Session ${i}`)).toBeInTheDocument(); + } + }); + + it('renders activity log entries with long names', () => { + const longName = 'A'.repeat(200); + const longSubName = 'B'.repeat(200); + const longNameRun = { + ...mockCompletedRun, + runId: 'run-long', + sessionName: longName, + subscriptionName: longSubName, + }; + + mockUseCueReturn = { + ...defaultUseCueReturn, + activityLog: [longNameRun], + }; + + render(); + fireEvent.click(screen.getByText('Dashboard')); + + expect(screen.getByText(/completed in 5s/)).toBeInTheDocument(); + }); + }); + describe('help view escape behavior', () => { it('should navigate to help view when help button is clicked', () => { render(); diff --git a/src/main/cue/cue-engine.ts b/src/main/cue/cue-engine.ts index 38e5bdd76..91ac3d8b4 100644 --- a/src/main/cue/cue-engine.ts +++ b/src/main/cue/cue-engine.ts @@ -47,6 +47,7 @@ const SOURCE_OUTPUT_MAX_CHARS = 5000; const HEARTBEAT_INTERVAL_MS = 30_000; // 30 seconds const SLEEP_THRESHOLD_MS = 120_000; // 2 minutes const EVENT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days +const MAX_CHAIN_DEPTH = 10; const DAY_NAMES = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'] as const; @@ -123,6 +124,7 @@ interface FanInSourceCompletion { sessionName: string; output: string; truncated: boolean; + chainDepth: number; } /** A queued event waiting for a concurrency slot */ @@ -133,6 +135,7 @@ interface QueuedEvent { outputPrompt?: string; subscriptionName: string; queuedAt: number; + chainDepth?: number; } export class CueEngine { @@ -511,6 +514,17 @@ export class CueEngine { notifyAgentCompleted(sessionId: string, completionData?: AgentCompletionData): void { if (!this.enabled) return; + // Guard against infinite chain loops (A triggers B triggers A). + // chainDepth is propagated through AgentCompletionData so it persists across async hops. + const chainDepth = completionData?.chainDepth ?? 0; + if (chainDepth >= MAX_CHAIN_DEPTH) { + this.deps.onLog( + 'error', + `[CUE] Max chain depth (${MAX_CHAIN_DEPTH}) exceeded — aborting to prevent infinite loop` + ); + return; + } + // Resolve the completing session's name for matching const allSessions = this.deps.getSessions(); const completingSession = allSessions.find((s) => s.id === sessionId); @@ -559,7 +573,7 @@ export class CueEngine { } this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed)`); - this.dispatchSubscription(ownerSessionId, sub, event, completingName); + this.dispatchSubscription(ownerSessionId, sub, event, completingName, chainDepth); } else { // Fan-in: track completions with data this.handleFanIn( @@ -601,7 +615,8 @@ export class CueEngine { ownerSessionId: string, sub: CueSubscription, event: CueEvent, - sourceSessionName: string + sourceSessionName: string, + chainDepth?: number ): void { if (sub.fan_out && sub.fan_out.length > 0) { // Fan-out: fire against each target session @@ -632,7 +647,8 @@ export class CueEngine { sub.prompt_file ?? sub.prompt, fanOutEvent, sub.name, - sub.output_prompt + sub.output_prompt, + chainDepth ); } } else { @@ -641,7 +657,8 @@ export class CueEngine { sub.prompt_file ?? sub.prompt, event, sub.name, - sub.output_prompt + sub.output_prompt, + chainDepth ); } } @@ -671,6 +688,7 @@ export class CueEngine { sessionName: completedSessionName, output: rawOutput.slice(-SOURCE_OUTPUT_MAX_CHARS), truncated: rawOutput.length > SOURCE_OUTPUT_MAX_CHARS, + chainDepth: completionData?.chainDepth ?? 0, }); // Start timeout timer on first source completion @@ -712,12 +730,14 @@ export class CueEngine { outputTruncated: completions.some((c) => c.truncated), }, }; + const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); this.dispatchSubscription( ownerSessionId, sub, event, - completions.map((c) => c.sessionName).join(', ') + completions.map((c) => c.sessionName).join(', '), + maxChainDepth ); } @@ -767,11 +787,18 @@ export class CueEngine { partial: true, }, }; + const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); this.deps.onLog( 'cue', `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` ); - this.dispatchSubscription(ownerSessionId, sub, event, completedNames.join(', ')); + this.dispatchSubscription( + ownerSessionId, + sub, + event, + completedNames.join(', '), + maxChainDepth + ); } else { // 'break' mode — log failure and clear this.fanInTrackers.delete(key); @@ -801,6 +828,24 @@ export class CueEngine { this.refreshSession(session.id, session.projectRoot); }); + // Warn about missing prompt files at setup time (not just at execution time) + for (const sub of config.subscriptions) { + if (sub.enabled === false) continue; + if (sub.agent_id && sub.agent_id !== session.id) continue; + if (sub.prompt_file && !sub.prompt) { + this.deps.onLog( + 'warn', + `[CUE] "${sub.name}" has prompt_file "${sub.prompt_file}" but the file was not found — subscription will fail on trigger` + ); + } + if (sub.output_prompt_file && !sub.output_prompt) { + this.deps.onLog( + 'warn', + `[CUE] "${sub.name}" has output_prompt_file "${sub.output_prompt_file}" but the file was not found` + ); + } + } + // Set up subscriptions for (const sub of config.subscriptions) { if (sub.enabled === false) continue; @@ -941,7 +986,7 @@ export class CueEngine { if (!times.includes(currentTime)) { // Evict stale fired-keys from previous minutes for (const key of this.scheduledFiredKeys) { - if (key.startsWith(`${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { + if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { this.scheduledFiredKeys.delete(key); } } @@ -949,7 +994,7 @@ export class CueEngine { } // Guard against double-fire (e.g., config refresh within the same minute) - const firedKey = `${sub.name}:${currentTime}`; + const firedKey = `${session.id}:${sub.name}:${currentTime}`; if (this.scheduledFiredKeys.has(firedKey)) { return; } @@ -1138,7 +1183,8 @@ export class CueEngine { prompt: string, event: CueEvent, subscriptionName: string, - outputPrompt?: string + outputPrompt?: string, + chainDepth?: number ): void { // Look up the config for this session to get concurrency settings const state = this.sessions.get(sessionId); @@ -1168,6 +1214,7 @@ export class CueEngine { outputPrompt, subscriptionName, queuedAt: Date.now(), + chainDepth, }); this.deps.onLog( @@ -1179,7 +1226,7 @@ export class CueEngine { // Slot available — dispatch immediately this.activeRunCount.set(sessionId, currentCount + 1); - this.doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt); + this.doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt, chainDepth); } /** @@ -1194,7 +1241,8 @@ export class CueEngine { prompt: string, event: CueEvent, subscriptionName: string, - outputPrompt?: string + outputPrompt?: string, + chainDepth?: number ): Promise { const session = this.deps.getSessions().find((s) => s.id === sessionId); const state = this.sessions.get(sessionId); @@ -1262,6 +1310,7 @@ export class CueEngine { `[CUE] "${subscriptionName}" executing output prompt for downstream handoff` ); + const outputRunId = crypto.randomUUID(); const outputEvent: CueEvent = { ...event, id: crypto.randomUUID(), @@ -1272,15 +1321,36 @@ export class CueEngine { }, }; + try { + recordCueEvent({ + id: outputRunId, + type: event.type, + triggerName: event.triggerName, + sessionId, + subscriptionName: `${subscriptionName}:output`, + status: 'running', + payload: JSON.stringify(outputEvent.payload), + }); + } catch { + // Non-fatal if DB is unavailable + } + const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`; const outputResult = await this.deps.onCueRun({ - runId, + runId: outputRunId, sessionId, prompt: contextPrompt, - subscriptionName, + subscriptionName: `${subscriptionName}:output`, event: outputEvent, timeoutMs, }); + + try { + updateCueEventStatus(outputRunId, outputResult.status); + } catch { + // Non-fatal if DB is unavailable + } + if (this.manuallyStoppedRuns.has(runId)) { return; } @@ -1342,6 +1412,7 @@ export class CueEngine { durationMs: result.durationMs, stdout: result.stdout, triggeredBy: subscriptionName, + chainDepth: (chainDepth ?? 0) + 1, }); } } @@ -1384,7 +1455,8 @@ export class CueEngine { entry.prompt, entry.event, entry.subscriptionName, - entry.outputPrompt + entry.outputPrompt, + entry.chainDepth ); } @@ -1417,6 +1489,18 @@ export class CueEngine { // Clean up fan-in trackers and timers for this session this.clearFanInState(sessionId); + + // Clean up queued events for this session (prevents stale events after config reload) + this.clearQueue(sessionId); + + // Clean up scheduledFiredKeys for this session's subscriptions + for (const sub of state.config.subscriptions) { + for (const key of this.scheduledFiredKeys) { + if (key.startsWith(`${sessionId}:${sub.name}:`)) { + this.scheduledFiredKeys.delete(key); + } + } + } } // --- Heartbeat & Sleep Detection --- diff --git a/src/main/cue/cue-github-poller.ts b/src/main/cue/cue-github-poller.ts index 1bd5e1c9b..8f65e7f97 100644 --- a/src/main/cue/cue-github-poller.ts +++ b/src/main/cue/cue-github-poller.ts @@ -64,6 +64,8 @@ export function createCueGitHubPoller(config: CueGitHubPollerConfig): () => void // Cached state let ghAvailable: boolean | null = null; let resolvedRepo: string | null = config.repo ?? null; + /** Tracks whether a poll has been attempted (success or failure) to prevent event flooding on recovery */ + let firstPollAttempted = false; async function checkGhAvailable(): Promise { if (ghAvailable !== null) return ghAvailable; @@ -245,6 +247,23 @@ export function createCueGitHubPoller(config: CueGitHubPollerConfig): () => void } catch (err) { const message = err instanceof Error ? err.message : String(err); onLog('error', `[CUE] GitHub poll error for "${triggerName}": ${message}`); + + // If the first poll ever fails, place a seed marker so the next successful + // poll doesn't treat ALL existing items as new (which would swallow items + // created during the outage by seeding them as "already seen") + if (!firstPollAttempted) { + try { + markGitHubItemSeen(subscriptionId, '__seed_marker__'); + onLog( + 'info', + `[CUE] First poll for "${triggerName}" failed — seed marker set to prevent silent event loss on recovery` + ); + } catch { + // Non-fatal: DB may not be available + } + } + } finally { + firstPollAttempted = true; } } diff --git a/src/main/cue/cue-types.ts b/src/main/cue/cue-types.ts index 7dd25391d..c1f474026 100644 --- a/src/main/cue/cue-types.ts +++ b/src/main/cue/cue-types.ts @@ -149,6 +149,8 @@ export interface AgentCompletionData { durationMs?: number; stdout?: string; triggeredBy?: string; + /** Tracks how many chained hops have occurred to prevent infinite loops */ + chainDepth?: number; } /** Session data with subscriptions for the Cue Graph visualization */ diff --git a/src/main/cue/cue-yaml-loader.ts b/src/main/cue/cue-yaml-loader.ts index 37014ec2c..42b327781 100644 --- a/src/main/cue/cue-yaml-loader.ts +++ b/src/main/cue/cue-yaml-loader.ts @@ -8,6 +8,7 @@ import * as fs from 'fs'; import * as path from 'path'; import * as yaml from 'js-yaml'; import * as chokidar from 'chokidar'; +import picomatch from 'picomatch'; import { type CueConfig, type CueSubscription, @@ -206,6 +207,17 @@ export function watchCueYaml(projectRoot: string, onChange: () => void): () => v }; } +/** Validates a glob pattern via picomatch, pushing an error if invalid. */ +function validateGlobPattern(pattern: string, prefix: string, errors: string[]): void { + try { + picomatch(pattern); + } catch (e) { + errors.push( + `${prefix}: "watch" value "${pattern}" is not a valid glob pattern: ${e instanceof Error ? e.message : String(e)}` + ); + } +} + /** * Validates a CueConfig-shaped object. Returns validation result with error messages. */ @@ -302,6 +314,8 @@ export function validateCueConfig(config: unknown): { valid: boolean; errors: st errors.push( `${prefix}: "watch" is required and must be a non-empty string for file.changed events` ); + } else { + validateGlobPattern(sub.watch as string, prefix, errors); } } else if (event === 'agent.completed') { if (!sub.source_session) { @@ -316,6 +330,8 @@ export function validateCueConfig(config: unknown): { valid: boolean; errors: st errors.push( `${prefix}: "watch" is required and must be a non-empty glob string for task.pending events` ); + } else { + validateGlobPattern(sub.watch as string, prefix, errors); } if (sub.poll_minutes !== undefined) { if (typeof sub.poll_minutes !== 'number' || sub.poll_minutes < 1) {