From 1bb0c1ee9f8c3363be817563762398940ba11438 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 18:20:47 -0600 Subject: [PATCH 1/7] refactor(cue): decompose cue-engine.ts into 5 focused modules (Phase 0+1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Delete CueGraphView.tsx (1,166 lines dead code, superseded by CuePipelineEditor) - Add cue-test-helpers.ts: shared createMockSession/Config/Deps factory across 6 test files - Deduplicate CueGraphSession: add to cue-pipeline-types.ts, remove local copies from CueModal and CuePipelineEditor - cue-engine.ts decomposition (1,574 → 719 lines, −55%): - cue-activity-log.ts: ring buffer (push/getAll/clear), ACTIVITY_LOG_MAX=500 - cue-heartbeat.ts: heartbeat writer + sleep/wake detection + reconciliation - cue-subscription-setup.ts: 5 setup functions (heartbeat, scheduled, file, GitHub, task) - cue-fan-in-tracker.ts: fan-in source tracking + completion dispatch - cue-run-manager.ts: concurrency gating, queue management, run lifecycle Key wiring decisions: - onRunCompleted callback resolves circular dep between run-manager and engine - onRunStopped callback ensures stopped runs appear in activity log - scheduledFiredKeys Set owned by engine, passed by ref to subscription setup - fanInTracker.reset() added to stop() to prevent timer leaks across restarts - calculateNextScheduledTime re-exported from engine for backwards compat Tests: 475 passing, 0 regressions; added stopRun→activityLog coverage --- .../main/cue/cue-activity-log.test.ts | 72 + .../main/cue/cue-completion-chains.test.ts | 43 +- .../main/cue/cue-concurrency.test.ts | 48 +- src/__tests__/main/cue/cue-engine.test.ts | 77 +- .../main/cue/cue-multi-hop-chains.test.ts | 43 +- .../main/cue/cue-session-lifecycle.test.ts | 43 +- src/__tests__/main/cue/cue-sleep-wake.test.ts | 40 +- src/__tests__/main/cue/cue-test-helpers.ts | 54 + .../renderer/components/CueModal.test.tsx | 5 - src/main/cue/cue-activity-log.ts | 40 + src/main/cue/cue-engine.ts | 1085 ++------------- src/main/cue/cue-fan-in-tracker.ts | 212 +++ src/main/cue/cue-heartbeat.ts | 102 ++ src/main/cue/cue-run-manager.ts | 412 ++++++ src/main/cue/cue-subscription-setup.ts | 365 ++++++ src/renderer/components/CueGraphView.tsx | 1166 ----------------- src/renderer/components/CueModal.tsx | 28 +- .../CuePipelineEditor/CuePipelineEditor.tsx | 25 +- src/shared/cue-pipeline-types.ts | 28 + 19 files changed, 1438 insertions(+), 2450 deletions(-) create mode 100644 src/__tests__/main/cue/cue-activity-log.test.ts create mode 100644 src/__tests__/main/cue/cue-test-helpers.ts create mode 100644 src/main/cue/cue-activity-log.ts create mode 100644 src/main/cue/cue-fan-in-tracker.ts create mode 100644 src/main/cue/cue-heartbeat.ts create mode 100644 src/main/cue/cue-run-manager.ts create mode 100644 src/main/cue/cue-subscription-setup.ts delete mode 100644 src/renderer/components/CueGraphView.tsx diff --git a/src/__tests__/main/cue/cue-activity-log.test.ts b/src/__tests__/main/cue/cue-activity-log.test.ts new file mode 100644 index 000000000..cb870a670 --- /dev/null +++ b/src/__tests__/main/cue/cue-activity-log.test.ts @@ -0,0 +1,72 @@ +/** + * Tests for the Cue activity log ring buffer. + */ + +import { describe, it, expect } from 'vitest'; +import { createCueActivityLog } from '../../../main/cue/cue-activity-log'; +import type { CueRunResult } from '../../../main/cue/cue-types'; + +function makeResult(id: string): CueRunResult { + return { + runId: id, + sessionId: 'session-1', + sessionName: 'Test', + subscriptionName: 'sub', + event: { id: 'e1', type: 'time.heartbeat', timestamp: '', triggerName: 'sub', payload: {} }, + status: 'completed', + stdout: '', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: '', + endedAt: '', + }; +} + +describe('createCueActivityLog', () => { + it('stores and retrieves results', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + expect(log.getAll()).toHaveLength(2); + expect(log.getAll()[0].runId).toBe('r1'); + }); + + it('respects limit parameter on getAll', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + log.push(makeResult('r3')); + const last2 = log.getAll(2); + expect(last2).toHaveLength(2); + expect(last2[0].runId).toBe('r2'); + expect(last2[1].runId).toBe('r3'); + }); + + it('evicts oldest entries when exceeding maxSize', () => { + const log = createCueActivityLog(3); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + log.push(makeResult('r3')); + log.push(makeResult('r4')); + const all = log.getAll(); + expect(all).toHaveLength(3); + expect(all[0].runId).toBe('r2'); + expect(all[2].runId).toBe('r4'); + }); + + it('clear empties the log', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.clear(); + expect(log.getAll()).toHaveLength(0); + }); + + it('returns a copy from getAll, not a reference', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + const snapshot = log.getAll(); + log.push(makeResult('r2')); + expect(snapshot).toHaveLength(1); + }); +}); diff --git a/src/__tests__/main/cue/cue-completion-chains.test.ts b/src/__tests__/main/cue/cue-completion-chains.test.ts index cdbd9e1f5..a35c75255 100644 --- a/src/__tests__/main/cue/cue-completion-chains.test.ts +++ b/src/__tests__/main/cue/cue-completion-chains.test.ts @@ -14,7 +14,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -52,47 +51,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine completion chains', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-concurrency.test.ts b/src/__tests__/main/cue/cue-concurrency.test.ts index 758e1aee2..03faa475e 100644 --- a/src/__tests__/main/cue/cue-concurrency.test.ts +++ b/src/__tests__/main/cue/cue-concurrency.test.ts @@ -13,7 +13,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -35,52 +34,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { - timeout_minutes: 30, - timeout_on_fail: 'break', - max_concurrent: 1, - queue_size: 10, - }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine Concurrency Control', () => { let yamlWatcherCleanup: ReturnType; diff --git a/src/__tests__/main/cue/cue-engine.test.ts b/src/__tests__/main/cue/cue-engine.test.ts index 225399e4e..669f99610 100644 --- a/src/__tests__/main/cue/cue-engine.test.ts +++ b/src/__tests__/main/cue/cue-engine.test.ts @@ -15,7 +15,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -53,48 +52,7 @@ import { calculateNextScheduledTime, type CueEngineDeps, } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async (request: Parameters[0]) => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: request.subscriptionName, - event: request.event, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onStopCueRun: vi.fn(() => true), - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine', () => { let yamlWatcherCleanup: ReturnType; @@ -930,6 +888,39 @@ describe('CueEngine', () => { engine.stop(); }); + it('stopRun adds the stopped run to the activity log', async () => { + const deps = createMockDeps({ + onCueRun: vi.fn(() => new Promise(() => {})), + }); + const config = createMockConfig({ + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(10); + + const activeRun = engine.getActiveRuns()[0]; + expect(activeRun).toBeDefined(); + engine.stopRun(activeRun.runId); + + const log = engine.getActivityLog(); + expect(log).toHaveLength(1); + expect(log[0].runId).toBe(activeRun.runId); + expect(log[0].status).toBe('stopped'); + + engine.stop(); + }); + it('stopAll clears all active runs', async () => { // Use a slow-resolving onCueRun to keep runs active const deps = createMockDeps({ diff --git a/src/__tests__/main/cue/cue-multi-hop-chains.test.ts b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts index ad9c82eb9..170f07d70 100644 --- a/src/__tests__/main/cue/cue-multi-hop-chains.test.ts +++ b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts @@ -12,7 +12,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -50,47 +49,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine multi-hop completion chains', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-session-lifecycle.test.ts b/src/__tests__/main/cue/cue-session-lifecycle.test.ts index 185a422eb..916c59405 100644 --- a/src/__tests__/main/cue/cue-session-lifecycle.test.ts +++ b/src/__tests__/main/cue/cue-session-lifecycle.test.ts @@ -12,7 +12,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -50,47 +49,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine session lifecycle', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-sleep-wake.test.ts b/src/__tests__/main/cue/cue-sleep-wake.test.ts index 022582455..00f47ad6e 100644 --- a/src/__tests__/main/cue/cue-sleep-wake.test.ts +++ b/src/__tests__/main/cue/cue-sleep-wake.test.ts @@ -10,8 +10,7 @@ */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; +import type { CueConfig } from '../../../main/cue/cue-types'; // Track cue-db calls const mockInitCueDb = vi.fn(); @@ -52,19 +51,10 @@ vi.mock('crypto', () => ({ randomUUID: vi.fn(() => `uuid-${Math.random().toString(36).slice(2, 8)}`), })); -import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} +import { CueEngine } from '../../../main/cue/cue-engine'; +import { createMockDeps } from './cue-test-helpers'; +/** Sleep-wake tests need a config with a default timer subscription */ function createMockConfig(overrides: Partial = {}): CueConfig { return { subscriptions: [ @@ -81,28 +71,6 @@ function createMockConfig(overrides: Partial = {}): CueConfig { }; } -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onLog: vi.fn(), - ...overrides, - }; -} - describe('CueEngine sleep/wake detection', () => { beforeEach(() => { vi.clearAllMocks(); diff --git a/src/__tests__/main/cue/cue-test-helpers.ts b/src/__tests__/main/cue/cue-test-helpers.ts new file mode 100644 index 000000000..ddf4b0f01 --- /dev/null +++ b/src/__tests__/main/cue/cue-test-helpers.ts @@ -0,0 +1,54 @@ +/** + * Shared test factories for Cue engine tests. + * + * Provides createMockSession, createMockConfig, and createMockDeps + * used across 6+ Cue test files. Centralizes the factory functions + * to avoid duplication and ensure consistent defaults. + */ + +import { vi } from 'vitest'; +import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; +import type { SessionInfo } from '../../../shared/types'; +import type { CueEngineDeps } from '../../../main/cue/cue-engine'; + +export function createMockSession(overrides: Partial = {}): SessionInfo { + return { + id: 'session-1', + name: 'Test Session', + toolType: 'claude-code', + cwd: '/projects/test', + projectRoot: '/projects/test', + ...overrides, + }; +} + +export function createMockConfig(overrides: Partial = {}): CueConfig { + return { + subscriptions: [], + settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, + ...overrides, + }; +} + +export function createMockDeps(overrides: Partial = {}): CueEngineDeps { + return { + getSessions: vi.fn(() => [createMockSession()]), + onCueRun: vi.fn(async (request: Parameters[0]) => ({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed' as const, + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + })), + onStopCueRun: vi.fn(() => true), + onLog: vi.fn(), + ...overrides, + }; +} diff --git a/src/__tests__/renderer/components/CueModal.test.tsx b/src/__tests__/renderer/components/CueModal.test.tsx index 012281746..6b4ee2ced 100644 --- a/src/__tests__/renderer/components/CueModal.test.tsx +++ b/src/__tests__/renderer/components/CueModal.test.tsx @@ -41,11 +41,6 @@ vi.mock('../../../renderer/components/CueYamlEditor', () => ({ isOpen ?
YAML Editor Mock
: null, })); -// Mock CueGraphView (kept for reference - replaced by CuePipelineEditor) -vi.mock('../../../renderer/components/CueGraphView', () => ({ - CueGraphView: () => null, -})); - // Capture the onDirtyChange callback from CuePipelineEditor let capturedOnDirtyChange: ((isDirty: boolean) => void) | undefined; diff --git a/src/main/cue/cue-activity-log.ts b/src/main/cue/cue-activity-log.ts new file mode 100644 index 000000000..394b37f29 --- /dev/null +++ b/src/main/cue/cue-activity-log.ts @@ -0,0 +1,40 @@ +/** + * In-memory ring buffer of completed Cue run results. + * + * Keeps the most recent N results for the activity log view + * in the Cue Modal dashboard. + */ + +import type { CueRunResult } from './cue-types'; + +const ACTIVITY_LOG_MAX = 500; + +export interface CueActivityLog { + push(result: CueRunResult): void; + getAll(limit?: number): CueRunResult[]; + clear(): void; +} + +export function createCueActivityLog(maxSize: number = ACTIVITY_LOG_MAX): CueActivityLog { + let log: CueRunResult[] = []; + + return { + push(result: CueRunResult): void { + log.push(result); + if (log.length > maxSize) { + log = log.slice(-maxSize); + } + }, + + getAll(limit?: number): CueRunResult[] { + if (limit !== undefined) { + return log.slice(-limit); + } + return [...log]; + }, + + clear(): void { + log = []; + }, + }; +} diff --git a/src/main/cue/cue-engine.ts b/src/main/cue/cue-engine.ts index 91ac3d8b4..c9a86a08b 100644 --- a/src/main/cue/cue-engine.ts +++ b/src/main/cue/cue-engine.ts @@ -25,67 +25,28 @@ import type { } from './cue-types'; import { DEFAULT_CUE_SETTINGS } from './cue-types'; import { loadCueConfig, watchCueYaml } from './cue-yaml-loader'; -import { createCueFileWatcher } from './cue-file-watcher'; -import { createCueGitHubPoller } from './cue-github-poller'; -import { createCueTaskScanner } from './cue-task-scanner'; import { matchesFilter, describeFilter } from './cue-filter'; import { - initCueDb, - closeCueDb, - updateHeartbeat, - getLastHeartbeat, - pruneCueEvents, - recordCueEvent, - updateCueEventStatus, -} from './cue-db'; -import { reconcileMissedTimeEvents } from './cue-reconciler'; -import type { ReconcileSessionInfo } from './cue-reconciler'; - -const ACTIVITY_LOG_MAX = 500; -const DEFAULT_FILE_DEBOUNCE_MS = 5000; -const SOURCE_OUTPUT_MAX_CHARS = 5000; -const HEARTBEAT_INTERVAL_MS = 30_000; // 30 seconds -const SLEEP_THRESHOLD_MS = 120_000; // 2 minutes -const EVENT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + setupHeartbeatSubscription, + setupScheduledSubscription, + setupFileWatcherSubscription, + setupGitHubPollerSubscription, + setupTaskScannerSubscription, + type SubscriptionSetupDeps, +} from './cue-subscription-setup'; +import { initCueDb, closeCueDb, pruneCueEvents } from './cue-db'; +import { createCueActivityLog } from './cue-activity-log'; +import type { CueActivityLog } from './cue-activity-log'; +import { createCueHeartbeat, EVENT_PRUNE_AGE_MS } from './cue-heartbeat'; +import type { CueHeartbeat } from './cue-heartbeat'; +import { createCueFanInTracker, SOURCE_OUTPUT_MAX_CHARS } from './cue-fan-in-tracker'; +import type { CueFanInTracker } from './cue-fan-in-tracker'; +import { createCueRunManager } from './cue-run-manager'; +import type { CueRunManager } from './cue-run-manager'; const MAX_CHAIN_DEPTH = 10; -const DAY_NAMES = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'] as const; - -/** - * Calculates the next occurrence of a scheduled time. - * Returns a timestamp in ms, or null if inputs are invalid. - */ -export function calculateNextScheduledTime(times: string[], days?: string[]): number | null { - if (times.length === 0) return null; - - const now = new Date(); - const candidates: number[] = []; - - // Check up to 7 days ahead to find the next match - for (let dayOffset = 0; dayOffset < 7; dayOffset++) { - const candidate = new Date(now); - candidate.setDate(candidate.getDate() + dayOffset); - const dayName = DAY_NAMES[candidate.getDay()]; - - if (days && days.length > 0 && !days.includes(dayName)) continue; - - for (const time of times) { - const [hourStr, minStr] = time.split(':'); - const hour = parseInt(hourStr, 10); - const min = parseInt(minStr, 10); - if (isNaN(hour) || isNaN(min)) continue; - - const target = new Date(candidate); - target.setHours(hour, min, 0, 0); - - if (target.getTime() > now.getTime()) { - candidates.push(target.getTime()); - } - } - } - - return candidates.length > 0 ? Math.min(...candidates) : null; -} +// Re-export for backwards compat (tests import from cue-engine) +export { calculateNextScheduledTime } from './cue-subscription-setup'; /** Dependencies injected into the CueEngine */ export interface CueEngineDeps { @@ -112,50 +73,73 @@ interface SessionState { nextTriggers: Map; // subscriptionName -> next trigger timestamp } -/** Active run tracking */ -interface ActiveRun { - result: CueRunResult; - abortController?: AbortController; -} - -/** Stored data for a single fan-in source completion */ -interface FanInSourceCompletion { - sessionId: string; - sessionName: string; - output: string; - truncated: boolean; - chainDepth: number; -} - -/** A queued event waiting for a concurrency slot */ -interface QueuedEvent { - event: CueEvent; - subscription: CueSubscription; - prompt: string; - outputPrompt?: string; - subscriptionName: string; - queuedAt: number; - chainDepth?: number; -} - export class CueEngine { private enabled = false; private sessions = new Map(); - private activeRuns = new Map(); - private activityLog: CueRunResult[] = []; - private fanInTrackers = new Map>(); - private fanInTimers = new Map>(); + private activityLog: CueActivityLog = createCueActivityLog(); + private fanInTracker!: CueFanInTracker; + private runManager!: CueRunManager; private pendingYamlWatchers = new Map void>(); - private activeRunCount = new Map(); - private eventQueue = new Map(); - private manuallyStoppedRuns = new Set(); /** Tracks "subName:HH:MM" keys that time.scheduled already fired, preventing double-fire on config refresh */ private scheduledFiredKeys = new Set(); - private heartbeatInterval: ReturnType | null = null; + private heartbeat: CueHeartbeat; private deps: CueEngineDeps; constructor(deps: CueEngineDeps) { this.deps = deps; + this.runManager = createCueRunManager({ + getSessions: deps.getSessions, + getSessionSettings: (sessionId) => this.sessions.get(sessionId)?.config.settings, + onCueRun: deps.onCueRun, + onStopCueRun: deps.onStopCueRun, + onLog: deps.onLog, + onRunCompleted: (sessionId, result, subscriptionName, chainDepth) => { + this.pushActivityLog(result); + this.notifyAgentCompleted(sessionId, { + sessionName: result.sessionName, + status: result.status, + exitCode: result.exitCode, + durationMs: result.durationMs, + stdout: result.stdout, + triggeredBy: subscriptionName, + chainDepth: (chainDepth ?? 0) + 1, + }); + }, + onRunStopped: (result) => { + this.pushActivityLog(result); + }, + }); + this.fanInTracker = createCueFanInTracker({ + onLog: deps.onLog, + getSessions: deps.getSessions, + dispatchSubscription: (ownerSessionId, sub, event, sourceSessionName, chainDepth) => { + this.dispatchSubscription(ownerSessionId, sub, event, sourceSessionName, chainDepth); + }, + }); + this.heartbeat = createCueHeartbeat({ + onLog: deps.onLog, + getSessions: () => { + const result = new Map(); + const allSessions = deps.getSessions(); + for (const [sessionId, state] of this.sessions) { + const session = allSessions.find((s) => s.id === sessionId); + result.set(sessionId, { + config: state.config, + sessionName: session?.name ?? sessionId, + }); + } + return result; + }, + onDispatch: (sessionId, sub, event) => { + this.runManager.execute( + sessionId, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); } /** Enable the engine and scan all sessions for Cue configs */ @@ -179,10 +163,10 @@ export class CueEngine { } // Detect sleep gap from previous heartbeat - this.detectSleepAndReconcile(); + this.heartbeat.detectSleepAndReconcile(); // Start heartbeat writer (30s interval) - this.startHeartbeat(); + this.heartbeat.start(); } /** Disable the engine, clearing all timers and watchers */ @@ -201,14 +185,13 @@ export class CueEngine { } this.pendingYamlWatchers.clear(); - // Clear concurrency state - this.eventQueue.clear(); - this.activeRunCount.clear(); - this.manuallyStoppedRuns.clear(); + // Clear concurrency and fan-in state + this.runManager.reset(); + this.fanInTracker.reset(); this.scheduledFiredKeys.clear(); // Stop heartbeat and close database - this.stopHeartbeat(); + this.heartbeat.stop(); try { closeCueDb(); } catch { @@ -262,8 +245,7 @@ export class CueEngine { removeSession(sessionId: string): void { this.teardownSession(sessionId); this.sessions.delete(sessionId); - this.clearQueue(sessionId); - this.activeRunCount.delete(sessionId); + this.runManager.clearQueue(sessionId); const pendingWatcher = this.pendingYamlWatchers.get(sessionId); if (pendingWatcher) { @@ -287,7 +269,7 @@ export class CueEngine { reportedSessionIds.add(sessionId); - const activeRunCount = [...this.activeRuns.values()].filter( + const activeRunCount = [...this.runManager.getActiveRunMap().values()].filter( (r) => r.result.sessionId === sessionId ).length; @@ -338,46 +320,23 @@ export class CueEngine { /** Returns currently running Cue executions */ getActiveRuns(): CueRunResult[] { - return [...this.activeRuns.values()].map((r) => r.result); + return this.runManager.getActiveRuns(); } /** Returns recent completed/failed runs */ getActivityLog(limit?: number): CueRunResult[] { - if (limit !== undefined) { - return this.activityLog.slice(-limit); - } - return [...this.activityLog]; + return this.activityLog.getAll(limit); } /** Stops a specific running execution */ stopRun(runId: string): boolean { - const run = this.activeRuns.get(runId); - if (!run) return false; - - this.manuallyStoppedRuns.add(runId); - this.deps.onStopCueRun?.(runId); - run.abortController?.abort(); - run.result.status = 'stopped'; - run.result.endedAt = new Date().toISOString(); - run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime(); - - this.activeRuns.delete(runId); - this.pushActivityLog(run.result); - this.deps.onLog('cue', `[CUE] Run stopped: ${runId}`, { - type: 'runStopped', - runId, - sessionId: run.result.sessionId, - subscriptionName: run.result.subscriptionName, - }); - return true; + const result = this.runManager.stopRun(runId); + return result; } /** Stops all running executions and clears all queues */ stopAll(): void { - for (const [runId] of this.activeRuns) { - this.stopRun(runId); - } - this.eventQueue.clear(); + this.runManager.stopAll(); } /** Returns master enabled state */ @@ -387,13 +346,7 @@ export class CueEngine { /** Returns queue depth per session (for the Cue Modal) */ getQueueStatus(): Map { - const result = new Map(); - for (const [sessionId, queue] of this.eventQueue) { - if (queue.length > 0) { - result.set(sessionId, queue.length); - } - } - return result; + return this.runManager.getQueueStatus(); } /** Returns the merged Cue settings from the first available session config */ @@ -476,7 +429,7 @@ export class CueEngine { /** Clears queued events for a session */ clearQueue(sessionId: string): void { - this.eventQueue.delete(sessionId); + this.runManager.clearQueue(sessionId); } /** @@ -576,9 +529,9 @@ export class CueEngine { this.dispatchSubscription(ownerSessionId, sub, event, completingName, chainDepth); } else { // Fan-in: track completions with data - this.handleFanIn( + this.fanInTracker.handleCompletion( ownerSessionId, - state, + state.config.settings, sub, sources, sessionId, @@ -592,16 +545,7 @@ export class CueEngine { /** Clear all fan-in state for a session (when Cue is disabled or session removed) */ clearFanInState(sessionId: string): void { - for (const key of [...this.fanInTrackers.keys()]) { - if (key.startsWith(`${sessionId}:`)) { - this.fanInTrackers.delete(key); - const timer = this.fanInTimers.get(key); - if (timer) { - clearTimeout(timer); - this.fanInTimers.delete(key); - } - } - } + this.fanInTracker.clearForSession(sessionId); } // --- Private methods --- @@ -642,7 +586,7 @@ export class CueEngine { fanOutIndex: i, }, }; - this.executeCueRun( + this.runManager.execute( targetSession.id, sub.prompt_file ?? sub.prompt, fanOutEvent, @@ -652,7 +596,7 @@ export class CueEngine { ); } } else { - this.executeCueRun( + this.runManager.execute( ownerSessionId, sub.prompt_file ?? sub.prompt, event, @@ -663,152 +607,6 @@ export class CueEngine { } } - /** - * Handle fan-in logic: track which sources have completed, fire when all done. - * Supports timeout handling based on the subscription's settings. - */ - private handleFanIn( - ownerSessionId: string, - state: SessionState, - sub: CueSubscription, - sources: string[], - completedSessionId: string, - completedSessionName: string, - completionData?: AgentCompletionData - ): void { - const key = `${ownerSessionId}:${sub.name}`; - - if (!this.fanInTrackers.has(key)) { - this.fanInTrackers.set(key, new Map()); - } - const tracker = this.fanInTrackers.get(key)!; - const rawOutput = completionData?.stdout ?? ''; - tracker.set(completedSessionId, { - sessionId: completedSessionId, - sessionName: completedSessionName, - output: rawOutput.slice(-SOURCE_OUTPUT_MAX_CHARS), - truncated: rawOutput.length > SOURCE_OUTPUT_MAX_CHARS, - chainDepth: completionData?.chainDepth ?? 0, - }); - - // Start timeout timer on first source completion - if (tracker.size === 1 && !this.fanInTimers.has(key)) { - const timeoutMs = (state.config.settings.timeout_minutes ?? 30) * 60 * 1000; - const timer = setTimeout(() => { - this.handleFanInTimeout(key, ownerSessionId, state, sub, sources); - }, timeoutMs); - this.fanInTimers.set(key, timer); - } - - const remaining = sources.length - tracker.size; - if (remaining > 0) { - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}": waiting for ${remaining} more session(s)` - ); - return; - } - - // All sources completed — clear timer and fire - const timer = this.fanInTimers.get(key); - if (timer) { - clearTimeout(timer); - this.fanInTimers.delete(key); - } - this.fanInTrackers.delete(key); - - const completions = [...tracker.values()]; - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'agent.completed', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { - completedSessions: completions.map((c) => c.sessionId), - sourceSession: completions.map((c) => c.sessionName).join(', '), - sourceOutput: completions.map((c) => c.output).join('\n---\n'), - outputTruncated: completions.some((c) => c.truncated), - }, - }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); - this.dispatchSubscription( - ownerSessionId, - sub, - event, - completions.map((c) => c.sessionName).join(', '), - maxChainDepth - ); - } - - /** - * Handle fan-in timeout. Behavior depends on timeout_on_fail setting: - * - 'break': log failure and clear the tracker - * - 'continue': fire the downstream subscription with partial data - */ - private handleFanInTimeout( - key: string, - ownerSessionId: string, - state: SessionState, - sub: CueSubscription, - sources: string[] - ): void { - this.fanInTimers.delete(key); - const tracker = this.fanInTrackers.get(key); - if (!tracker) return; - - const completedNames = [...tracker.values()].map((c) => c.sessionName); - const completedIds = [...tracker.keys()]; - - // Determine which sources haven't completed yet - const allSessions = this.deps.getSessions(); - const timedOutSources = sources.filter((src) => { - const session = allSessions.find((s) => s.name === src || s.id === src); - const sessionId = session?.id ?? src; - return !completedIds.includes(sessionId) && !completedIds.includes(src); - }); - - if (state.config.settings.timeout_on_fail === 'continue') { - // Fire with partial data - const completions = [...tracker.values()]; - this.fanInTrackers.delete(key); - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'agent.completed', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { - completedSessions: completions.map((c) => c.sessionId), - timedOutSessions: timedOutSources, - sourceSession: completions.map((c) => c.sessionName).join(', '), - sourceOutput: completions.map((c) => c.output).join('\n---\n'), - outputTruncated: completions.some((c) => c.truncated), - partial: true, - }, - }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` - ); - this.dispatchSubscription( - ownerSessionId, - sub, - event, - completedNames.join(', '), - maxChainDepth - ); - } else { - // 'break' mode — log failure and clear - this.fanInTrackers.delete(key); - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}" timed out (break mode) — ${completedNames.length}/${sources.length} completed, waiting for: ${timedOutSources.join(', ')}` - ); - } - } - private initSession(session: SessionInfo): void { if (!this.enabled) return; @@ -847,21 +645,30 @@ export class CueEngine { } // Set up subscriptions + const setupDeps: SubscriptionSetupDeps = { + enabled: () => this.enabled, + scheduledFiredKeys: this.scheduledFiredKeys, + onLog: this.deps.onLog, + executeCueRun: (sid, prompt, event, subName, outputPrompt) => { + this.runManager.execute(sid, prompt, event, subName, outputPrompt); + }, + }; + for (const sub of config.subscriptions) { if (sub.enabled === false) continue; // Skip subscriptions bound to a different agent if (sub.agent_id && sub.agent_id !== session.id) continue; if (sub.event === 'time.heartbeat' && sub.interval_minutes) { - this.setupHeartbeatSubscription(session, state, sub); + setupHeartbeatSubscription(setupDeps, session, state, sub); } else if (sub.event === 'time.scheduled' && sub.schedule_times?.length) { - this.setupScheduledSubscription(session, state, sub); + setupScheduledSubscription(setupDeps, session, state, sub); } else if (sub.event === 'file.changed' && sub.watch) { - this.setupFileWatcherSubscription(session, state, sub); + setupFileWatcherSubscription(setupDeps, session, state, sub); } else if (sub.event === 'task.pending' && sub.watch) { - this.setupTaskScannerSubscription(session, state, sub); + setupTaskScannerSubscription(setupDeps, session, state, sub); } else if (sub.event === 'github.pull_request' || sub.event === 'github.issue') { - this.setupGitHubPollerSubscription(session, state, sub); + setupGitHubPollerSubscription(setupDeps, session, state, sub); } // agent.completed subscriptions are handled reactively via notifyAgentCompleted } @@ -873,604 +680,8 @@ export class CueEngine { ); } - private setupHeartbeatSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - interval_minutes?: number; - filter?: Record; - } - ): void { - const intervalMs = (sub.interval_minutes ?? 0) * 60 * 1000; - if (intervalMs <= 0) return; - - // Fire immediately on first setup - const immediateEvent: CueEvent = { - id: crypto.randomUUID(), - type: 'time.heartbeat', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { interval_minutes: sub.interval_minutes }, - }; - - // Check payload filter (even for timer events) - if (!sub.filter || matchesFilter(immediateEvent.payload, sub.filter)) { - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat, initial)`); - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - immediateEvent, - sub.name, - sub.output_prompt - ); - } else { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - } - - // Then on the interval - const timer = setInterval(() => { - if (!this.enabled) return; - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'time.heartbeat', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { interval_minutes: sub.interval_minutes }, - }; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat)`); - state.lastTriggered = event.timestamp; - state.nextTriggers.set(sub.name, Date.now() + intervalMs); - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, intervalMs); - - state.nextTriggers.set(sub.name, Date.now() + intervalMs); - state.timers.push(timer); - } - - private setupScheduledSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - schedule_times?: string[]; - schedule_days?: string[]; - filter?: Record; - } - ): void { - const times = sub.schedule_times ?? []; - if (times.length === 0) return; - - const checkAndFire = () => { - if (!this.enabled) return; - - const now = new Date(); - const dayNames = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']; - const currentDay = dayNames[now.getDay()]; - const currentTime = `${String(now.getHours()).padStart(2, '0')}:${String(now.getMinutes()).padStart(2, '0')}`; - - // Check day filter (if specified, current day must match) - if (sub.schedule_days && sub.schedule_days.length > 0) { - if (!sub.schedule_days.includes(currentDay)) { - return; - } - } - - // Check if current time matches any scheduled time - if (!times.includes(currentTime)) { - // Evict stale fired-keys from previous minutes - for (const key of this.scheduledFiredKeys) { - if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { - this.scheduledFiredKeys.delete(key); - } - } - return; - } - - // Guard against double-fire (e.g., config refresh within the same minute) - const firedKey = `${session.id}:${sub.name}:${currentTime}`; - if (this.scheduledFiredKeys.has(firedKey)) { - return; - } - this.scheduledFiredKeys.add(firedKey); - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'time.scheduled', - timestamp: now.toISOString(), - triggerName: sub.name, - payload: { - schedule_times: sub.schedule_times, - schedule_days: sub.schedule_days, - matched_time: currentTime, - matched_day: currentDay, - }, - }; - - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }; - - // Check every 60 seconds to catch scheduled times - const timer = setInterval(checkAndFire, 60_000); - state.timers.push(timer); - - // Calculate and track the next trigger time - const nextMs = calculateNextScheduledTime(times, sub.schedule_days); - if (nextMs != null) { - state.nextTriggers.set(sub.name, nextMs); - } - } - - private setupFileWatcherSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - watch?: string; - filter?: Record; - } - ): void { - if (!sub.watch) return; - - const cleanup = createCueFileWatcher({ - watchGlob: sub.watch, - projectRoot: session.projectRoot, - debounceMs: DEFAULT_FILE_DEBOUNCE_MS, - triggerName: sub.name, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (file.changed)`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - private setupGitHubPollerSubscription( - session: SessionInfo, - state: SessionState, - sub: CueSubscription - ): void { - const cleanup = createCueGitHubPoller({ - eventType: sub.event as 'github.pull_request' | 'github.issue', - repo: sub.repo, - pollMinutes: sub.poll_minutes ?? 5, - projectRoot: session.projectRoot, - triggerName: sub.name, - subscriptionId: `${session.id}:${sub.name}`, - ghState: sub.gh_state, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (${sub.event})`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - private setupTaskScannerSubscription( - session: SessionInfo, - state: SessionState, - sub: CueSubscription - ): void { - if (!sub.watch) return; - - const cleanup = createCueTaskScanner({ - watchGlob: sub.watch, - pollMinutes: sub.poll_minutes ?? 1, - projectRoot: session.projectRoot, - triggerName: sub.name, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount} task(s) in ${event.payload.filename})` - ); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - /** - * Gate for concurrency control. Checks if a slot is available for this session. - * If at limit, queues the event. Otherwise dispatches immediately. - */ - private executeCueRun( - sessionId: string, - prompt: string, - event: CueEvent, - subscriptionName: string, - outputPrompt?: string, - chainDepth?: number - ): void { - // Look up the config for this session to get concurrency settings - const state = this.sessions.get(sessionId); - const maxConcurrent = state?.config.settings.max_concurrent ?? 1; - const queueSize = state?.config.settings.queue_size ?? 10; - const currentCount = this.activeRunCount.get(sessionId) ?? 0; - - if (currentCount >= maxConcurrent) { - // At concurrency limit — queue the event - const sessionName = - this.deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; - if (!this.eventQueue.has(sessionId)) { - this.eventQueue.set(sessionId, []); - } - const queue = this.eventQueue.get(sessionId)!; - - if (queue.length >= queueSize) { - // Drop the oldest entry - queue.shift(); - this.deps.onLog('cue', `[CUE] Queue full for "${sessionName}", dropping oldest event`); - } - - queue.push({ - event, - subscription: { name: subscriptionName, event: event.type, enabled: true, prompt }, - prompt, - outputPrompt, - subscriptionName, - queuedAt: Date.now(), - chainDepth, - }); - - this.deps.onLog( - 'cue', - `[CUE] Event queued for "${sessionName}" (${queue.length}/${queueSize} in queue, ${currentCount}/${maxConcurrent} concurrent)` - ); - return; - } - - // Slot available — dispatch immediately - this.activeRunCount.set(sessionId, currentCount + 1); - this.doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt, chainDepth); - } - - /** - * Actually executes a Cue run. Called when a concurrency slot is available. - * - * If outputPrompt is provided, a second run is executed after the main task - * completes successfully. The output prompt receives the main task's stdout - * as context, and its output replaces the stdout passed downstream. - */ - private async doExecuteCueRun( - sessionId: string, - prompt: string, - event: CueEvent, - subscriptionName: string, - outputPrompt?: string, - chainDepth?: number - ): Promise { - const session = this.deps.getSessions().find((s) => s.id === sessionId); - const state = this.sessions.get(sessionId); - const runId = crypto.randomUUID(); - const abortController = new AbortController(); - - const result: CueRunResult = { - runId, - sessionId, - sessionName: session?.name ?? 'Unknown', - subscriptionName, - event, - status: 'running', - stdout: '', - stderr: '', - exitCode: null, - durationMs: 0, - startedAt: new Date().toISOString(), - endedAt: '', - }; - - this.activeRuns.set(runId, { result, abortController }); - const timeoutMs = (state?.config.settings.timeout_minutes ?? 30) * 60 * 1000; - try { - recordCueEvent({ - id: runId, - type: event.type, - triggerName: event.triggerName, - sessionId, - subscriptionName, - status: 'running', - payload: JSON.stringify(event.payload), - }); - } catch { - // Non-fatal if DB is unavailable - } - this.deps.onLog('cue', `[CUE] Run started: ${subscriptionName}`, { - type: 'runStarted', - runId, - sessionId, - subscriptionName, - }); - - try { - const runResult = await this.deps.onCueRun({ - runId, - sessionId, - prompt, - subscriptionName, - event, - timeoutMs, - }); - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - result.status = runResult.status; - result.stdout = runResult.stdout; - result.stderr = runResult.stderr; - result.exitCode = runResult.exitCode; - - // Execute output prompt if the main task succeeded and an output prompt is configured - if (outputPrompt && result.status === 'completed') { - this.deps.onLog( - 'cue', - `[CUE] "${subscriptionName}" executing output prompt for downstream handoff` - ); - - const outputRunId = crypto.randomUUID(); - const outputEvent: CueEvent = { - ...event, - id: crypto.randomUUID(), - payload: { - ...event.payload, - sourceOutput: result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS), - outputPromptPhase: true, - }, - }; - - try { - recordCueEvent({ - id: outputRunId, - type: event.type, - triggerName: event.triggerName, - sessionId, - subscriptionName: `${subscriptionName}:output`, - status: 'running', - payload: JSON.stringify(outputEvent.payload), - }); - } catch { - // Non-fatal if DB is unavailable - } - - const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`; - const outputResult = await this.deps.onCueRun({ - runId: outputRunId, - sessionId, - prompt: contextPrompt, - subscriptionName: `${subscriptionName}:output`, - event: outputEvent, - timeoutMs, - }); - - try { - updateCueEventStatus(outputRunId, outputResult.status); - } catch { - // Non-fatal if DB is unavailable - } - - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - - if (outputResult.status === 'completed') { - result.stdout = outputResult.stdout; - } else { - this.deps.onLog( - 'cue', - `[CUE] "${subscriptionName}" output prompt failed (${outputResult.status}), using main task output` - ); - } - } - } catch (error) { - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - result.status = 'failed'; - result.stderr = error instanceof Error ? error.message : String(error); - } finally { - result.endedAt = new Date().toISOString(); - result.durationMs = Date.now() - new Date(result.startedAt).getTime(); - this.activeRuns.delete(runId); - - // Decrement active run count and drain queue - const count = this.activeRunCount.get(sessionId) ?? 1; - this.activeRunCount.set(sessionId, Math.max(0, count - 1)); - this.drainQueue(sessionId); - - const wasManuallyStopped = this.manuallyStoppedRuns.has(runId); - if (wasManuallyStopped) { - try { - updateCueEventStatus(runId, 'stopped'); - } catch { - // Non-fatal if DB is unavailable - } - this.manuallyStoppedRuns.delete(runId); - } else { - this.pushActivityLog(result); - try { - updateCueEventStatus(runId, result.status); - } catch { - // Non-fatal if DB is unavailable - } - this.deps.onLog('cue', `[CUE] Run finished: ${subscriptionName} (${result.status})`, { - type: 'runFinished', - runId, - sessionId, - subscriptionName, - status: result.status, - }); - - // Emit completion event for agent completion chains - // This allows downstream subscriptions to react to this Cue run's completion - this.notifyAgentCompleted(sessionId, { - sessionName: result.sessionName, - status: result.status, - exitCode: result.exitCode, - durationMs: result.durationMs, - stdout: result.stdout, - triggeredBy: subscriptionName, - chainDepth: (chainDepth ?? 0) + 1, - }); - } - } - } - - /** - * Drain the event queue for a session, dispatching events while slots are available. - * Drops stale events that have exceeded the timeout. - */ - private drainQueue(sessionId: string): void { - const queue = this.eventQueue.get(sessionId); - if (!queue || queue.length === 0) return; - - const state = this.sessions.get(sessionId); - const maxConcurrent = state?.config.settings.max_concurrent ?? 1; - const timeoutMs = (state?.config.settings.timeout_minutes ?? 30) * 60 * 1000; - const sessionName = this.deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; - - while (queue.length > 0) { - const currentCount = this.activeRunCount.get(sessionId) ?? 0; - if (currentCount >= maxConcurrent) break; - - const entry = queue.shift()!; - const ageMs = Date.now() - entry.queuedAt; - - // Check for stale events - if (ageMs > timeoutMs) { - const ageMinutes = Math.round(ageMs / 60000); - this.deps.onLog( - 'cue', - `[CUE] Dropping stale queued event for "${sessionName}" (queued ${ageMinutes}m ago)` - ); - continue; - } - - // Dispatch the queued event - this.activeRunCount.set(sessionId, currentCount + 1); - this.doExecuteCueRun( - sessionId, - entry.prompt, - entry.event, - entry.subscriptionName, - entry.outputPrompt, - entry.chainDepth - ); - } - - // Clean up empty queue - if (queue.length === 0) { - this.eventQueue.delete(sessionId); - } - } - private pushActivityLog(result: CueRunResult): void { this.activityLog.push(result); - if (this.activityLog.length > ACTIVITY_LOG_MAX) { - this.activityLog = this.activityLog.slice(-ACTIVITY_LOG_MAX); - } } private teardownSession(sessionId: string): void { @@ -1502,82 +713,4 @@ export class CueEngine { } } } - - // --- Heartbeat & Sleep Detection --- - - private startHeartbeat(): void { - this.stopHeartbeat(); - try { - updateHeartbeat(); - } catch { - // Non-fatal if DB not ready - } - this.heartbeatInterval = setInterval(() => { - try { - updateHeartbeat(); - } catch { - // Non-fatal - } - }, HEARTBEAT_INTERVAL_MS); - } - - private stopHeartbeat(): void { - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - this.heartbeatInterval = null; - } - } - - /** - * Check the last heartbeat to detect if the machine slept. - * If a gap >= SLEEP_THRESHOLD_MS is found, run the reconciler. - */ - private detectSleepAndReconcile(): void { - try { - const lastHeartbeat = getLastHeartbeat(); - if (lastHeartbeat === null) return; // First ever start — nothing to reconcile - - const now = Date.now(); - const gapMs = now - lastHeartbeat; - - if (gapMs < SLEEP_THRESHOLD_MS) return; - - const gapMinutes = Math.round(gapMs / 60_000); - this.deps.onLog( - 'cue', - `[CUE] Sleep detected (gap: ${gapMinutes}m). Reconciling missed events.` - ); - - // Build session info map for the reconciler - const reconcileSessions = new Map(); - const allSessions = this.deps.getSessions(); - for (const [sessionId, state] of this.sessions) { - const session = allSessions.find((s) => s.id === sessionId); - reconcileSessions.set(sessionId, { - config: state.config, - sessionName: session?.name ?? sessionId, - }); - } - - reconcileMissedTimeEvents({ - sleepStartMs: lastHeartbeat, - wakeTimeMs: now, - sessions: reconcileSessions, - onDispatch: (sessionId, sub, event) => { - this.executeCueRun( - sessionId, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - onLog: (level, message) => { - this.deps.onLog(level as MainLogLevel, message); - }, - }); - } catch (error) { - this.deps.onLog('warn', `[CUE] Sleep detection failed: ${error}`); - } - } } diff --git a/src/main/cue/cue-fan-in-tracker.ts b/src/main/cue/cue-fan-in-tracker.ts new file mode 100644 index 000000000..4401011a8 --- /dev/null +++ b/src/main/cue/cue-fan-in-tracker.ts @@ -0,0 +1,212 @@ +/** + * Fan-in completion tracker for the Cue Engine. + * + * Tracks multi-source agent.completed subscriptions: when a subscription + * lists multiple source_sessions, this module accumulates completions + * and fires the downstream subscription when all sources have reported + * (or on timeout, depending on the timeout_on_fail setting). + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { SessionInfo } from '../../shared/types'; +import type { AgentCompletionData, CueEvent, CueSettings, CueSubscription } from './cue-types'; + +export const SOURCE_OUTPUT_MAX_CHARS = 5000; + +/** Stored data for a single fan-in source completion */ +export interface FanInSourceCompletion { + sessionId: string; + sessionName: string; + output: string; + truncated: boolean; + chainDepth: number; +} + +export interface CueFanInDeps { + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + getSessions: () => SessionInfo[]; + dispatchSubscription: ( + ownerSessionId: string, + sub: CueSubscription, + event: CueEvent, + sourceSessionName: string, + chainDepth?: number + ) => void; +} + +export interface CueFanInTracker { + handleCompletion( + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[], + completedSessionId: string, + completedSessionName: string, + completionData?: AgentCompletionData + ): void; + clearForSession(sessionId: string): void; + reset(): void; +} + +export function createCueFanInTracker(deps: CueFanInDeps): CueFanInTracker { + const fanInTrackers = new Map>(); + const fanInTimers = new Map>(); + + function handleFanInTimeout( + key: string, + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[] + ): void { + fanInTimers.delete(key); + const tracker = fanInTrackers.get(key); + if (!tracker) return; + + const completedNames = [...tracker.values()].map((c) => c.sessionName); + const completedIds = [...tracker.keys()]; + + // Determine which sources haven't completed yet + const allSessions = deps.getSessions(); + const timedOutSources = sources.filter((src) => { + const session = allSessions.find((s) => s.name === src || s.id === src); + const sessionId = session?.id ?? src; + return !completedIds.includes(sessionId) && !completedIds.includes(src); + }); + + if (settings.timeout_on_fail === 'continue') { + // Fire with partial data + const completions = [...tracker.values()]; + fanInTrackers.delete(key); + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'agent.completed', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { + completedSessions: completions.map((c) => c.sessionId), + timedOutSessions: timedOutSources, + sourceSession: completions.map((c) => c.sessionName).join(', '), + sourceOutput: completions.map((c) => c.output).join('\n---\n'), + outputTruncated: completions.some((c) => c.truncated), + partial: true, + }, + }; + const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); + deps.onLog( + 'cue', + `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` + ); + deps.dispatchSubscription( + ownerSessionId, + sub, + event, + completedNames.join(', '), + maxChainDepth + ); + } else { + // 'break' mode — log failure and clear + fanInTrackers.delete(key); + deps.onLog( + 'cue', + `[CUE] Fan-in "${sub.name}" timed out (break mode) — ${completedNames.length}/${sources.length} completed, waiting for: ${timedOutSources.join(', ')}` + ); + } + } + + return { + handleCompletion( + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[], + completedSessionId: string, + completedSessionName: string, + completionData?: AgentCompletionData + ): void { + const key = `${ownerSessionId}:${sub.name}`; + + if (!fanInTrackers.has(key)) { + fanInTrackers.set(key, new Map()); + } + const tracker = fanInTrackers.get(key)!; + const rawOutput = completionData?.stdout ?? ''; + tracker.set(completedSessionId, { + sessionId: completedSessionId, + sessionName: completedSessionName, + output: rawOutput.slice(-SOURCE_OUTPUT_MAX_CHARS), + truncated: rawOutput.length > SOURCE_OUTPUT_MAX_CHARS, + chainDepth: completionData?.chainDepth ?? 0, + }); + + // Start timeout timer on first source completion + if (tracker.size === 1 && !fanInTimers.has(key)) { + const timeoutMs = (settings.timeout_minutes ?? 30) * 60 * 1000; + const timer = setTimeout(() => { + handleFanInTimeout(key, ownerSessionId, settings, sub, sources); + }, timeoutMs); + fanInTimers.set(key, timer); + } + + const remaining = sources.length - tracker.size; + if (remaining > 0) { + deps.onLog('cue', `[CUE] Fan-in "${sub.name}": waiting for ${remaining} more session(s)`); + return; + } + + // All sources completed — clear timer and fire + const timer = fanInTimers.get(key); + if (timer) { + clearTimeout(timer); + fanInTimers.delete(key); + } + fanInTrackers.delete(key); + + const completions = [...tracker.values()]; + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'agent.completed', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { + completedSessions: completions.map((c) => c.sessionId), + sourceSession: completions.map((c) => c.sessionName).join(', '), + sourceOutput: completions.map((c) => c.output).join('\n---\n'), + outputTruncated: completions.some((c) => c.truncated), + }, + }; + const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); + deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); + deps.dispatchSubscription( + ownerSessionId, + sub, + event, + completions.map((c) => c.sessionName).join(', '), + maxChainDepth + ); + }, + + clearForSession(sessionId: string): void { + for (const key of [...fanInTrackers.keys()]) { + if (key.startsWith(`${sessionId}:`)) { + fanInTrackers.delete(key); + const timer = fanInTimers.get(key); + if (timer) { + clearTimeout(timer); + fanInTimers.delete(key); + } + } + } + }, + + reset(): void { + for (const timer of fanInTimers.values()) { + clearTimeout(timer); + } + fanInTrackers.clear(); + fanInTimers.clear(); + }, + }; +} diff --git a/src/main/cue/cue-heartbeat.ts b/src/main/cue/cue-heartbeat.ts new file mode 100644 index 000000000..227eda7ab --- /dev/null +++ b/src/main/cue/cue-heartbeat.ts @@ -0,0 +1,102 @@ +/** + * Heartbeat writer and sleep/wake detection for the Cue Engine. + * + * Writes a heartbeat timestamp to the Cue database every 30 seconds. + * On engine start, checks the gap since the last heartbeat; if the gap + * exceeds 2 minutes, triggers the reconciler for missed time-based events. + */ + +import type { MainLogLevel } from '../../shared/logger-types'; +import { updateHeartbeat, getLastHeartbeat } from './cue-db'; +import { reconcileMissedTimeEvents } from './cue-reconciler'; +import type { ReconcileSessionInfo } from './cue-reconciler'; +import type { CueConfig, CueEvent, CueSubscription } from './cue-types'; + +export const HEARTBEAT_INTERVAL_MS = 30_000; // 30 seconds +export const SLEEP_THRESHOLD_MS = 120_000; // 2 minutes +export const EVENT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +export interface CueHeartbeatDeps { + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + getSessions: () => Map; + onDispatch: (sessionId: string, sub: CueSubscription, event: CueEvent) => void; +} + +export interface CueHeartbeat { + start(): void; + stop(): void; + /** Run sleep detection and reconciliation (also called on engine start) */ + detectSleepAndReconcile(): void; +} + +export function createCueHeartbeat(deps: CueHeartbeatDeps): CueHeartbeat { + let heartbeatInterval: ReturnType | null = null; + + function startHeartbeat(): void { + stopHeartbeat(); + try { + updateHeartbeat(); + } catch { + // Non-fatal if DB not ready + } + heartbeatInterval = setInterval(() => { + try { + updateHeartbeat(); + } catch { + // Non-fatal + } + }, HEARTBEAT_INTERVAL_MS); + } + + function stopHeartbeat(): void { + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + } + + function detectSleepAndReconcile(): void { + try { + const lastHeartbeat = getLastHeartbeat(); + if (lastHeartbeat === null) return; // First ever start — nothing to reconcile + + const now = Date.now(); + const gapMs = now - lastHeartbeat; + + if (gapMs < SLEEP_THRESHOLD_MS) return; + + const gapMinutes = Math.round(gapMs / 60_000); + deps.onLog('cue', `[CUE] Sleep detected (gap: ${gapMinutes}m). Reconciling missed events.`); + + // Build session info map for the reconciler + const reconcileSessions = new Map(); + const sessions = deps.getSessions(); + for (const [sessionId, state] of sessions) { + reconcileSessions.set(sessionId, { + config: state.config, + sessionName: state.sessionName, + }); + } + + reconcileMissedTimeEvents({ + sleepStartMs: lastHeartbeat, + wakeTimeMs: now, + sessions: reconcileSessions, + onDispatch: (sessionId, sub, event) => { + deps.onDispatch(sessionId, sub, event); + }, + onLog: (level, message) => { + deps.onLog(level as MainLogLevel, message); + }, + }); + } catch (error) { + deps.onLog('warn', `[CUE] Sleep detection failed: ${error}`); + } + } + + return { + start: startHeartbeat, + stop: stopHeartbeat, + detectSleepAndReconcile, + }; +} diff --git a/src/main/cue/cue-run-manager.ts b/src/main/cue/cue-run-manager.ts new file mode 100644 index 000000000..b630afe03 --- /dev/null +++ b/src/main/cue/cue-run-manager.ts @@ -0,0 +1,412 @@ +/** + * Cue Run Manager — concurrency control, queue management, and run execution. + * + * Manages the lifecycle of Cue run executions: + * - Concurrency gating (max_concurrent per session) + * - Event queuing when at concurrency limit + * - Queue draining when slots free + * - Active run tracking and stop controls + * - Output prompt execution (two-phase runs) + * - Completion event emission for chain propagation + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { CueEvent, CueRunResult, CueSettings, CueSubscription } from './cue-types'; +import { recordCueEvent, updateCueEventStatus } from './cue-db'; +import { SOURCE_OUTPUT_MAX_CHARS } from './cue-fan-in-tracker'; + +/** Active run tracking */ +export interface ActiveRun { + result: CueRunResult; + abortController?: AbortController; +} + +/** A queued event waiting for a concurrency slot */ +export interface QueuedEvent { + event: CueEvent; + subscription: CueSubscription; + prompt: string; + outputPrompt?: string; + subscriptionName: string; + queuedAt: number; + chainDepth?: number; +} + +export interface CueRunManagerDeps { + getSessions: () => { id: string; name: string }[]; + getSessionSettings: (sessionId: string) => CueSettings | undefined; + onCueRun: (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => Promise; + onStopCueRun?: (runId: string) => boolean; + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + /** Called when a run finishes naturally (completed/failed/timeout) — pushes to activity log AND triggers chain propagation */ + onRunCompleted: ( + sessionId: string, + result: CueRunResult, + subscriptionName: string, + chainDepth?: number + ) => void; + /** Called when a run is manually stopped — pushes to activity log only (no chain propagation) */ + onRunStopped: (result: CueRunResult) => void; +} + +export interface CueRunManager { + execute( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): void; + stopRun(runId: string): boolean; + stopAll(): void; + getActiveRuns(): CueRunResult[]; + getActiveRunMap(): Map; + getQueueStatus(): Map; + clearQueue(sessionId: string): void; + reset(): void; +} + +export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { + const activeRuns = new Map(); + const activeRunCount = new Map(); + const eventQueue = new Map(); + const manuallyStoppedRuns = new Set(); + + function getSessionName(sessionId: string): string { + return deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; + } + + function drainQueue(sessionId: string): void { + const queue = eventQueue.get(sessionId); + if (!queue || queue.length === 0) return; + + const settings = deps.getSessionSettings(sessionId); + const maxConcurrent = settings?.max_concurrent ?? 1; + const timeoutMs = (settings?.timeout_minutes ?? 30) * 60 * 1000; + const sessionName = getSessionName(sessionId); + + while (queue.length > 0) { + const currentCount = activeRunCount.get(sessionId) ?? 0; + if (currentCount >= maxConcurrent) break; + + const entry = queue.shift()!; + const ageMs = Date.now() - entry.queuedAt; + + // Check for stale events + if (ageMs > timeoutMs) { + const ageMinutes = Math.round(ageMs / 60000); + deps.onLog( + 'cue', + `[CUE] Dropping stale queued event for "${sessionName}" (queued ${ageMinutes}m ago)` + ); + continue; + } + + // Dispatch the queued event + activeRunCount.set(sessionId, currentCount + 1); + doExecuteCueRun( + sessionId, + entry.prompt, + entry.event, + entry.subscriptionName, + entry.outputPrompt, + entry.chainDepth + ); + } + + // Clean up empty queue + if (queue.length === 0) { + eventQueue.delete(sessionId); + } + } + + async function doExecuteCueRun( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): Promise { + const sessionName = getSessionName(sessionId); + const settings = deps.getSessionSettings(sessionId); + const runId = crypto.randomUUID(); + const abortController = new AbortController(); + + const result: CueRunResult = { + runId, + sessionId, + sessionName, + subscriptionName, + event, + status: 'running', + stdout: '', + stderr: '', + exitCode: null, + durationMs: 0, + startedAt: new Date().toISOString(), + endedAt: '', + }; + + activeRuns.set(runId, { result, abortController }); + const timeoutMs = (settings?.timeout_minutes ?? 30) * 60 * 1000; + try { + recordCueEvent({ + id: runId, + type: event.type, + triggerName: event.triggerName, + sessionId, + subscriptionName, + status: 'running', + payload: JSON.stringify(event.payload), + }); + } catch { + // Non-fatal if DB is unavailable + } + deps.onLog('cue', `[CUE] Run started: ${subscriptionName}`, { + type: 'runStarted', + runId, + sessionId, + subscriptionName, + }); + + try { + const runResult = await deps.onCueRun({ + runId, + sessionId, + prompt, + subscriptionName, + event, + timeoutMs, + }); + if (manuallyStoppedRuns.has(runId)) { + return; + } + result.status = runResult.status; + result.stdout = runResult.stdout; + result.stderr = runResult.stderr; + result.exitCode = runResult.exitCode; + + // Execute output prompt if the main task succeeded and an output prompt is configured + if (outputPrompt && result.status === 'completed') { + deps.onLog( + 'cue', + `[CUE] "${subscriptionName}" executing output prompt for downstream handoff` + ); + + const outputRunId = crypto.randomUUID(); + const outputEvent: CueEvent = { + ...event, + id: crypto.randomUUID(), + payload: { + ...event.payload, + sourceOutput: result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS), + outputPromptPhase: true, + }, + }; + + try { + recordCueEvent({ + id: outputRunId, + type: event.type, + triggerName: event.triggerName, + sessionId, + subscriptionName: `${subscriptionName}:output`, + status: 'running', + payload: JSON.stringify(outputEvent.payload), + }); + } catch { + // Non-fatal if DB is unavailable + } + + const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`; + const outputResult = await deps.onCueRun({ + runId: outputRunId, + sessionId, + prompt: contextPrompt, + subscriptionName: `${subscriptionName}:output`, + event: outputEvent, + timeoutMs, + }); + + try { + updateCueEventStatus(outputRunId, outputResult.status); + } catch { + // Non-fatal if DB is unavailable + } + + if (manuallyStoppedRuns.has(runId)) { + return; + } + + if (outputResult.status === 'completed') { + result.stdout = outputResult.stdout; + } else { + deps.onLog( + 'cue', + `[CUE] "${subscriptionName}" output prompt failed (${outputResult.status}), using main task output` + ); + } + } + } catch (error) { + if (manuallyStoppedRuns.has(runId)) { + return; + } + result.status = 'failed'; + result.stderr = error instanceof Error ? error.message : String(error); + } finally { + result.endedAt = new Date().toISOString(); + result.durationMs = Date.now() - new Date(result.startedAt).getTime(); + activeRuns.delete(runId); + + // Decrement active run count and drain queue + const count = activeRunCount.get(sessionId) ?? 1; + activeRunCount.set(sessionId, Math.max(0, count - 1)); + drainQueue(sessionId); + + const wasManuallyStopped = manuallyStoppedRuns.has(runId); + if (wasManuallyStopped) { + try { + updateCueEventStatus(runId, 'stopped'); + } catch { + // Non-fatal if DB is unavailable + } + manuallyStoppedRuns.delete(runId); + } else { + try { + updateCueEventStatus(runId, result.status); + } catch { + // Non-fatal if DB is unavailable + } + deps.onLog('cue', `[CUE] Run finished: ${subscriptionName} (${result.status})`, { + type: 'runFinished', + runId, + sessionId, + subscriptionName, + status: result.status, + }); + + // Notify engine of completion (for activity log + chain propagation) + deps.onRunCompleted(sessionId, result, subscriptionName, chainDepth); + } + } + } + + return { + execute( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): void { + const settings = deps.getSessionSettings(sessionId); + const maxConcurrent = settings?.max_concurrent ?? 1; + const queueSize = settings?.queue_size ?? 10; + const currentCount = activeRunCount.get(sessionId) ?? 0; + + if (currentCount >= maxConcurrent) { + // At concurrency limit — queue the event + const sessionName = getSessionName(sessionId); + if (!eventQueue.has(sessionId)) { + eventQueue.set(sessionId, []); + } + const queue = eventQueue.get(sessionId)!; + + if (queue.length >= queueSize) { + // Drop the oldest entry + queue.shift(); + deps.onLog('cue', `[CUE] Queue full for "${sessionName}", dropping oldest event`); + } + + queue.push({ + event, + subscription: { name: subscriptionName, event: event.type, enabled: true, prompt }, + prompt, + outputPrompt, + subscriptionName, + queuedAt: Date.now(), + chainDepth, + }); + + deps.onLog( + 'cue', + `[CUE] Event queued for "${sessionName}" (${queue.length}/${queueSize} in queue, ${currentCount}/${maxConcurrent} concurrent)` + ); + return; + } + + // Slot available — dispatch immediately + activeRunCount.set(sessionId, currentCount + 1); + doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt, chainDepth); + }, + + stopRun(runId: string): boolean { + const run = activeRuns.get(runId); + if (!run) return false; + + manuallyStoppedRuns.add(runId); + deps.onStopCueRun?.(runId); + run.abortController?.abort(); + run.result.status = 'stopped'; + run.result.endedAt = new Date().toISOString(); + run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime(); + + activeRuns.delete(runId); + deps.onRunStopped(run.result); + deps.onLog('cue', `[CUE] Run stopped: ${runId}`, { + type: 'runStopped', + runId, + sessionId: run.result.sessionId, + subscriptionName: run.result.subscriptionName, + }); + return true; + }, + + stopAll(): void { + for (const [runId] of activeRuns) { + this.stopRun(runId); + } + eventQueue.clear(); + }, + + getActiveRuns(): CueRunResult[] { + return [...activeRuns.values()].map((r) => r.result); + }, + + getActiveRunMap(): Map { + return activeRuns; + }, + + getQueueStatus(): Map { + const result = new Map(); + for (const [sessionId, queue] of eventQueue) { + if (queue.length > 0) { + result.set(sessionId, queue.length); + } + } + return result; + }, + + clearQueue(sessionId: string): void { + eventQueue.delete(sessionId); + }, + + reset(): void { + activeRuns.clear(); + activeRunCount.clear(); + eventQueue.clear(); + manuallyStoppedRuns.clear(); + }, + }; +} diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts new file mode 100644 index 000000000..e52867507 --- /dev/null +++ b/src/main/cue/cue-subscription-setup.ts @@ -0,0 +1,365 @@ +/** + * Subscription setup logic for the Cue Engine. + * + * Sets up event source subscriptions (timers, file watchers, pollers, task scanners) + * for a session's Cue config. Each setup method creates the necessary watchers/timers + * and wires them to the engine's event dispatch pipeline. + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { SessionInfo } from '../../shared/types'; +import type { CueEvent, CueSubscription } from './cue-types'; +import { createCueFileWatcher } from './cue-file-watcher'; +import { createCueGitHubPoller } from './cue-github-poller'; +import { createCueTaskScanner } from './cue-task-scanner'; +import { matchesFilter, describeFilter } from './cue-filter'; + +export const DEFAULT_FILE_DEBOUNCE_MS = 5000; + +const DAY_NAMES = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'] as const; + +/** + * Calculates the next occurrence of a scheduled time. + * Returns a timestamp in ms, or null if inputs are invalid. + */ +export function calculateNextScheduledTime(times: string[], days?: string[]): number | null { + if (times.length === 0) return null; + + const now = new Date(); + const candidates: number[] = []; + + // Check up to 7 days ahead to find the next match + for (let dayOffset = 0; dayOffset < 7; dayOffset++) { + const candidate = new Date(now); + candidate.setDate(candidate.getDate() + dayOffset); + const dayName = DAY_NAMES[candidate.getDay()]; + + if (days && days.length > 0 && !days.includes(dayName)) continue; + + for (const time of times) { + const [hourStr, minStr] = time.split(':'); + const hour = parseInt(hourStr, 10); + const min = parseInt(minStr, 10); + if (isNaN(hour) || isNaN(min)) continue; + + const target = new Date(candidate); + target.setHours(hour, min, 0, 0); + + if (target.getTime() > now.getTime()) { + candidates.push(target.getTime()); + } + } + } + + return candidates.length > 0 ? Math.min(...candidates) : null; +} + +/** Mutable state passed to subscription setup functions */ +export interface SubscriptionSetupState { + timers: ReturnType[]; + watchers: (() => void)[]; + lastTriggered?: string; + nextTriggers: Map; +} + +/** Dependencies for subscription setup */ +export interface SubscriptionSetupDeps { + enabled: () => boolean; + scheduledFiredKeys: Set; + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + executeCueRun: ( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string + ) => void; +} + +export function setupHeartbeatSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + interval_minutes?: number; + filter?: Record; + } +): void { + const intervalMs = (sub.interval_minutes ?? 0) * 60 * 1000; + if (intervalMs <= 0) return; + + // Fire immediately on first setup + const immediateEvent: CueEvent = { + id: crypto.randomUUID(), + type: 'time.heartbeat', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { interval_minutes: sub.interval_minutes }, + }; + + // Check payload filter (even for timer events) + if (!sub.filter || matchesFilter(immediateEvent.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat, initial)`); + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + immediateEvent, + sub.name, + sub.output_prompt + ); + } else { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + } + + // Then on the interval + const timer = setInterval(() => { + if (!deps.enabled()) return; + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'time.heartbeat', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { interval_minutes: sub.interval_minutes }, + }; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat)`); + state.lastTriggered = event.timestamp; + state.nextTriggers.set(sub.name, Date.now() + intervalMs); + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, intervalMs); + + state.nextTriggers.set(sub.name, Date.now() + intervalMs); + state.timers.push(timer); +} + +export function setupScheduledSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + schedule_times?: string[]; + schedule_days?: string[]; + filter?: Record; + } +): void { + const times = sub.schedule_times ?? []; + if (times.length === 0) return; + + const checkAndFire = () => { + if (!deps.enabled()) return; + + const now = new Date(); + const currentDay = DAY_NAMES[now.getDay()]; + const currentTime = `${String(now.getHours()).padStart(2, '0')}:${String(now.getMinutes()).padStart(2, '0')}`; + + // Check day filter (if specified, current day must match) + if (sub.schedule_days && sub.schedule_days.length > 0) { + if (!sub.schedule_days.includes(currentDay)) { + return; + } + } + + // Check if current time matches any scheduled time + if (!times.includes(currentTime)) { + // Evict stale fired-keys from previous minutes + for (const key of deps.scheduledFiredKeys) { + if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { + deps.scheduledFiredKeys.delete(key); + } + } + return; + } + + // Guard against double-fire (e.g., config refresh within the same minute) + const firedKey = `${session.id}:${sub.name}:${currentTime}`; + if (deps.scheduledFiredKeys.has(firedKey)) { + return; + } + deps.scheduledFiredKeys.add(firedKey); + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'time.scheduled', + timestamp: now.toISOString(), + triggerName: sub.name, + payload: { + schedule_times: sub.schedule_times, + schedule_days: sub.schedule_days, + matched_time: currentTime, + matched_day: currentDay, + }, + }; + + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }; + + // Check every 60 seconds to catch scheduled times + const timer = setInterval(checkAndFire, 60_000); + state.timers.push(timer); + + // Calculate and track the next trigger time + const nextMs = calculateNextScheduledTime(times, sub.schedule_days); + if (nextMs != null) { + state.nextTriggers.set(sub.name, nextMs); + } +} + +export function setupFileWatcherSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + watch?: string; + filter?: Record; + } +): void { + if (!sub.watch) return; + + const cleanup = createCueFileWatcher({ + watchGlob: sub.watch, + projectRoot: session.projectRoot, + debounceMs: DEFAULT_FILE_DEBOUNCE_MS, + triggerName: sub.name, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (file.changed)`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} + +export function setupGitHubPollerSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: CueSubscription +): void { + const cleanup = createCueGitHubPoller({ + eventType: sub.event as 'github.pull_request' | 'github.issue', + repo: sub.repo, + pollMinutes: sub.poll_minutes ?? 5, + projectRoot: session.projectRoot, + triggerName: sub.name, + subscriptionId: `${session.id}:${sub.name}`, + ghState: sub.gh_state, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (${sub.event})`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} + +export function setupTaskScannerSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: CueSubscription +): void { + if (!sub.watch) return; + + const cleanup = createCueTaskScanner({ + watchGlob: sub.watch, + pollMinutes: sub.poll_minutes ?? 1, + projectRoot: session.projectRoot, + triggerName: sub.name, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog( + 'cue', + `[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount} task(s) in ${event.payload.filename})` + ); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} diff --git a/src/renderer/components/CueGraphView.tsx b/src/renderer/components/CueGraphView.tsx deleted file mode 100644 index c53aa6ae3..000000000 --- a/src/renderer/components/CueGraphView.tsx +++ /dev/null @@ -1,1166 +0,0 @@ -/** - * CueGraphView — Canvas-based visualization of Maestro Cue subscription graph. - * - * Shows how triggers (events) connect to agents, and how agents chain to other agents - * via agent.completed subscriptions. Follows the same canvas-based rendering approach - * as the Document Graph MindMap for visual consistency. - * - * Features: - * - Trigger nodes (event sources) on the left - * - Agent nodes in the center/right - * - Edges showing subscription connections with labels - * - Pan/zoom with mouse - * - Click-and-drag to reposition individual nodes - * - Layout algorithm dropdown (Hierarchical, Force-Directed) - * - Double-click an agent node to switch focus and close the modal - */ - -import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; -import { - forceSimulation, - forceLink, - forceManyBody, - forceCenter, - forceCollide, - forceX, - forceY, - type SimulationNodeDatum, - type SimulationLinkDatum, -} from 'd3-force'; -import { RefreshCw, Network, ChevronDown } from 'lucide-react'; -import type { Theme } from '../types'; -import { useSessionStore } from '../stores/sessionStore'; - -// ============================================================================ -// Types -// ============================================================================ - -interface CueGraphViewProps { - theme: Theme; - onClose: () => void; -} - -type GraphNodeType = 'agent' | 'trigger'; - -interface GraphNode extends SimulationNodeDatum { - id: string; - type: GraphNodeType; - label: string; - sublabel: string; - sessionId?: string; - toolType?: string; - subscriptionCount?: number; - eventType?: string; - width: number; - height: number; - /** Depth in the dependency graph (0 = trigger, 1+ = agents) */ - depth?: number; -} - -interface GraphEdge extends SimulationLinkDatum { - id: string; - label: string; - sourceId: string; - targetId: string; -} - -type CueLayoutType = 'hierarchical' | 'force'; - -const LAYOUT_LABELS: Record = { - hierarchical: { name: 'Hierarchical', description: 'Left-to-right layers' }, - force: { name: 'Force', description: 'Physics simulation' }, -}; - -// ============================================================================ -// Constants -// ============================================================================ - -const CUE_TEAL = '#06b6d4'; - -const AGENT_NODE_WIDTH = 180; -const AGENT_NODE_HEIGHT = 56; -const TRIGGER_NODE_WIDTH = 160; -const TRIGGER_NODE_HEIGHT = 44; -const NODE_BORDER_RADIUS = 10; - -const EVENT_COLORS: Record = { - 'time.heartbeat': '#f59e0b', - 'time.scheduled': '#8b5cf6', - 'file.changed': '#3b82f6', - 'agent.completed': '#22c55e', - 'github.pull_request': '#a855f7', - 'github.issue': '#f97316', - 'task.pending': CUE_TEAL, -}; - -const EVENT_LABELS: Record = { - 'time.heartbeat': 'Heartbeat', - 'time.scheduled': 'Scheduled', - 'file.changed': 'File Watch', - 'agent.completed': 'Agent Done', - 'github.pull_request': 'GitHub PR', - 'github.issue': 'GitHub Issue', - 'task.pending': 'Task Queue', -}; - -// ============================================================================ -// Graph Data Builder -// ============================================================================ - -interface GraphData { - nodes: GraphNode[]; - edges: GraphEdge[]; -} - -function buildGraphData( - graphSessions: Array<{ - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - source_session?: string | string[]; - fan_out?: string[]; - watch?: string; - interval_minutes?: number; - repo?: string; - poll_minutes?: number; - }>; - }>, - allSessions: Array<{ id: string; name: string; toolType: string }> -): GraphData { - const nodes: GraphNode[] = []; - const edges: GraphEdge[] = []; - const nodeIds = new Set(); - const triggerKeys = new Map(); // composite key → node id - - // Add agent nodes for all Cue-enabled sessions - for (const gs of graphSessions) { - const nodeId = `agent:${gs.sessionId}`; - if (!nodeIds.has(nodeId)) { - nodes.push({ - id: nodeId, - type: 'agent', - label: gs.sessionName, - sublabel: gs.toolType, - sessionId: gs.sessionId, - toolType: gs.toolType, - subscriptionCount: gs.subscriptions.filter((s) => s.enabled !== false).length, - width: AGENT_NODE_WIDTH, - height: AGENT_NODE_HEIGHT, - }); - nodeIds.add(nodeId); - } - } - - // Helper to ensure an agent node exists (for referenced agents not in graph sessions) - function ensureAgentNode(sessionName: string) { - // Try to find by name in graphSessions first - const gs = graphSessions.find((s) => s.sessionName === sessionName); - if (gs) return `agent:${gs.sessionId}`; - - // Try to find in all sessions - const session = allSessions.find((s) => s.name === sessionName); - const nodeId = session ? `agent:${session.id}` : `agent:ref:${sessionName}`; - - if (!nodeIds.has(nodeId)) { - nodes.push({ - id: nodeId, - type: 'agent', - label: sessionName, - sublabel: session?.toolType ?? 'unknown', - sessionId: session?.id, - toolType: session?.toolType, - width: AGENT_NODE_WIDTH, - height: AGENT_NODE_HEIGHT, - }); - nodeIds.add(nodeId); - } - return nodeId; - } - - // Process subscriptions - for (const gs of graphSessions) { - const agentNodeId = `agent:${gs.sessionId}`; - - for (const sub of gs.subscriptions) { - if (sub.enabled === false) continue; - - if (sub.event === 'agent.completed') { - // Agent → Agent connection - const sources = Array.isArray(sub.source_session) - ? sub.source_session - : sub.source_session - ? [sub.source_session] - : []; - - for (const sourceName of sources) { - const sourceNodeId = ensureAgentNode(sourceName); - edges.push({ - id: `edge:${sourceNodeId}→${agentNodeId}:${sub.name}`, - source: sourceNodeId, - target: agentNodeId, - sourceId: sourceNodeId, - targetId: agentNodeId, - label: sub.name, - }); - } - - // Also handle fan_out targets - if (sub.fan_out) { - for (const targetName of sub.fan_out) { - const targetNodeId = ensureAgentNode(targetName); - edges.push({ - id: `edge:${agentNodeId}→${targetNodeId}:${sub.name}:fanout`, - source: agentNodeId, - target: targetNodeId, - sourceId: agentNodeId, - targetId: targetNodeId, - label: `${sub.name} (fan-out)`, - }); - } - } - } else { - // Trigger → Agent connection - const triggerDetail = getTriggerDetail(sub); - const triggerKey = `${sub.event}:${triggerDetail}`; - - let triggerNodeId = triggerKeys.get(triggerKey); - if (!triggerNodeId) { - triggerNodeId = `trigger:${triggerKey}`; - triggerKeys.set(triggerKey, triggerNodeId); - - nodes.push({ - id: triggerNodeId, - type: 'trigger', - label: EVENT_LABELS[sub.event] ?? sub.event, - sublabel: triggerDetail, - eventType: sub.event, - width: TRIGGER_NODE_WIDTH, - height: TRIGGER_NODE_HEIGHT, - }); - nodeIds.add(triggerNodeId); - } - - // Edge from trigger to this agent - edges.push({ - id: `edge:${triggerNodeId}→${agentNodeId}:${sub.name}`, - source: triggerNodeId, - target: agentNodeId, - sourceId: triggerNodeId, - targetId: agentNodeId, - label: sub.name, - }); - - // Handle fan_out for non-agent.completed events - if (sub.fan_out) { - for (const targetName of sub.fan_out) { - const targetNodeId = ensureAgentNode(targetName); - edges.push({ - id: `edge:${triggerNodeId}→${targetNodeId}:${sub.name}:fanout`, - source: triggerNodeId, - target: targetNodeId, - sourceId: triggerNodeId, - targetId: targetNodeId, - label: `${sub.name} (fan-out)`, - }); - } - } - } - } - } - - return { nodes, edges }; -} - -function getTriggerDetail(sub: { - event: string; - watch?: string; - interval_minutes?: number; - repo?: string; - poll_minutes?: number; -}): string { - switch (sub.event) { - case 'time.heartbeat': - return sub.interval_minutes ? `${sub.interval_minutes}m` : 'heartbeat'; - case 'file.changed': - return sub.watch ?? '**/*'; - case 'github.pull_request': - case 'github.issue': - return sub.repo ?? 'repo'; - case 'task.pending': - return sub.watch ?? 'tasks'; - default: - return sub.event; - } -} - -// ============================================================================ -// Layout: Hierarchical (left-to-right layers) -// ============================================================================ - -function layoutHierarchical( - nodes: GraphNode[], - edges: GraphEdge[], - width: number, - height: number -): void { - if (nodes.length === 0) return; - - // Build adjacency for depth calculation (source → targets) - const outgoing = new Map(); - for (const edge of edges) { - const srcId = typeof edge.source === 'string' ? edge.source : (edge.source as GraphNode).id; - const tgtId = typeof edge.target === 'string' ? edge.target : (edge.target as GraphNode).id; - if (!outgoing.has(srcId)) outgoing.set(srcId, []); - outgoing.get(srcId)!.push(tgtId); - } - - // Assign depth via BFS from triggers (depth 0) - const depthMap = new Map(); - const triggers = nodes.filter((n) => n.type === 'trigger'); - const agents = nodes.filter((n) => n.type === 'agent'); - - // All triggers are depth 0 - for (const t of triggers) { - depthMap.set(t.id, 0); - } - - // BFS to assign depths to agents - const queue: string[] = triggers.map((t) => t.id); - while (queue.length > 0) { - const current = queue.shift()!; - const currentDepth = depthMap.get(current) ?? 0; - const targets = outgoing.get(current) ?? []; - for (const targetId of targets) { - const existingDepth = depthMap.get(targetId); - const newDepth = currentDepth + 1; - if (existingDepth === undefined || newDepth > existingDepth) { - depthMap.set(targetId, newDepth); - queue.push(targetId); - } - } - } - - // Agents without edges get depth 1 - for (const a of agents) { - if (!depthMap.has(a.id)) { - depthMap.set(a.id, 1); - } - } - - // Store depth on nodes - for (const node of nodes) { - node.depth = depthMap.get(node.id) ?? 0; - } - - // Group nodes by depth - const layers = new Map(); - for (const node of nodes) { - const d = node.depth!; - if (!layers.has(d)) layers.set(d, []); - layers.get(d)!.push(node); - } - - const sortedDepths = Array.from(layers.keys()).sort((a, b) => a - b); - const numLayers = sortedDepths.length; - if (numLayers === 0) return; - - // Calculate horizontal spacing - const horizontalPadding = 80; - const availableWidth = width - horizontalPadding * 2; - const layerSpacing = numLayers > 1 ? availableWidth / (numLayers - 1) : 0; - - // Position each layer - for (let i = 0; i < sortedDepths.length; i++) { - const depth = sortedDepths[i]; - const layerNodes = layers.get(depth)!; - - // X position for this layer - const layerX = numLayers === 1 ? width / 2 : horizontalPadding + i * layerSpacing; - - // Vertical spacing within layer - const verticalPadding = 40; - const maxNodeHeight = Math.max(...layerNodes.map((n) => n.height)); - const nodeSpacing = maxNodeHeight + 30; - const totalLayerHeight = layerNodes.length * nodeSpacing - 30; - const startY = (height - totalLayerHeight) / 2 + verticalPadding / 2; - - for (let j = 0; j < layerNodes.length; j++) { - layerNodes[j].x = layerX; - layerNodes[j].y = Math.max(verticalPadding, startY + j * nodeSpacing); - } - } -} - -// ============================================================================ -// Layout: Force-Directed (d3-force) -// ============================================================================ - -function layoutForce(nodes: GraphNode[], edges: GraphEdge[], width: number, height: number): void { - if (nodes.length === 0) return; - - // Seed initial positions: triggers left, agents right - for (const node of nodes) { - if (node.type === 'trigger') { - node.x = width * 0.25 + (Math.random() - 0.5) * 100; - node.y = height * 0.5 + (Math.random() - 0.5) * 200; - } else { - node.x = width * 0.7 + (Math.random() - 0.5) * 100; - node.y = height * 0.5 + (Math.random() - 0.5) * 200; - } - } - - const simulation = forceSimulation(nodes) - .force( - 'link', - forceLink(edges) - .id((d) => d.id) - .distance(220) - .strength(0.5) - ) - .force('charge', forceManyBody().strength(-500)) - .force('center', forceCenter(width / 2, height / 2)) - .force( - 'collide', - forceCollide().radius((d) => Math.max(d.width, d.height) * 0.8) - ) - .force( - 'x', - forceX() - .x((d) => (d.type === 'trigger' ? width * 0.25 : width * 0.7)) - .strength(0.15) - ) - .force('y', forceY(height / 2).strength(0.05)) - .stop(); - - // Run simulation synchronously - for (let i = 0; i < 300; i++) { - simulation.tick(); - } -} - -// ============================================================================ -// Layout dispatcher -// ============================================================================ - -function layoutGraph( - layoutType: CueLayoutType, - nodes: GraphNode[], - edges: GraphEdge[], - width: number, - height: number -): void { - if (layoutType === 'hierarchical') { - layoutHierarchical(nodes, edges, width, height); - } else { - layoutForce(nodes, edges, width, height); - } -} - -// ============================================================================ -// Canvas Rendering -// ============================================================================ - -function roundRect( - ctx: CanvasRenderingContext2D, - x: number, - y: number, - w: number, - h: number, - r: number -): void { - ctx.beginPath(); - ctx.moveTo(x + r, y); - ctx.lineTo(x + w - r, y); - ctx.quadraticCurveTo(x + w, y, x + w, y + r); - ctx.lineTo(x + w, y + h - r); - ctx.quadraticCurveTo(x + w, y + h, x + w - r, y + h); - ctx.lineTo(x + r, y + h); - ctx.quadraticCurveTo(x, y + h, x, y + h - r); - ctx.lineTo(x, y + r); - ctx.quadraticCurveTo(x, y, x + r, y); - ctx.closePath(); -} - -function truncateText(ctx: CanvasRenderingContext2D, text: string, maxWidth: number): string { - if (ctx.measureText(text).width <= maxWidth) return text; - let truncated = text; - while (truncated.length > 0 && ctx.measureText(truncated + '...').width > maxWidth) { - truncated = truncated.slice(0, -1); - } - return truncated + '...'; -} - -function drawArrowhead( - ctx: CanvasRenderingContext2D, - toX: number, - toY: number, - angle: number, - size: number, - color: string -): void { - ctx.fillStyle = color; - ctx.beginPath(); - ctx.moveTo(toX, toY); - ctx.lineTo( - toX - size * Math.cos(angle - Math.PI / 6), - toY - size * Math.sin(angle - Math.PI / 6) - ); - ctx.lineTo( - toX - size * Math.cos(angle + Math.PI / 6), - toY - size * Math.sin(angle + Math.PI / 6) - ); - ctx.closePath(); - ctx.fill(); -} - -function renderGraph( - ctx: CanvasRenderingContext2D, - nodes: GraphNode[], - edges: GraphEdge[], - theme: Theme, - transform: { zoom: number; panX: number; panY: number }, - hoveredNodeId: string | null, - selectedNodeId: string | null, - draggingNodeId: string | null, - canvasWidth: number, - canvasHeight: number -): void { - const dpr = window.devicePixelRatio || 1; - - ctx.setTransform(dpr, 0, 0, dpr, 0, 0); - ctx.clearRect(0, 0, canvasWidth, canvasHeight); - - // Apply transform - ctx.save(); - ctx.translate(transform.panX, transform.panY); - ctx.scale(transform.zoom, transform.zoom); - - // Draw edges - for (const edge of edges) { - const source = edge.source as GraphNode; - const target = edge.target as GraphNode; - if (!source.x || !source.y || !target.x || !target.y) continue; - - const sx = source.x + source.width / 2; - const sy = source.y; - const tx = target.x - target.width / 2; - const ty = target.y; - - // Determine edge color based on source type - const edgeColor = - source.type === 'trigger' - ? (EVENT_COLORS[source.eventType ?? ''] ?? theme.colors.textDim) - : '#22c55e'; - - ctx.strokeStyle = edgeColor + '80'; - ctx.lineWidth = 2; - ctx.setLineDash([]); - - // Bezier curve - const dx = Math.abs(tx - sx); - const controlOffset = Math.min(dx * 0.4, 120); - - ctx.beginPath(); - ctx.moveTo(sx, sy); - ctx.bezierCurveTo(sx + controlOffset, sy, tx - controlOffset, ty, tx, ty); - ctx.stroke(); - - // Arrowhead - const angle = Math.atan2(ty - (ty - 0), tx - (tx - controlOffset)); - drawArrowhead(ctx, tx, ty, angle, 8, edgeColor + 'cc'); - - // Edge label - const midX = (sx + tx) / 2; - const midY = (sy + ty) / 2 - 8; - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - const labelText = truncateText(ctx, edge.label, 120); - ctx.fillText(labelText, midX, midY); - } - - // Draw nodes - for (const node of nodes) { - if (node.x === undefined || node.y === undefined) continue; - - const nx = node.x - node.width / 2; - const ny = node.y - node.height / 2; - const isHovered = hoveredNodeId === node.id; - const isSelected = selectedNodeId === node.id; - const isDragging = draggingNodeId === node.id; - - if (node.type === 'trigger') { - // Trigger node - pill shape with event color - const color = EVENT_COLORS[node.eventType ?? ''] ?? CUE_TEAL; - - roundRect(ctx, nx, ny, node.width, node.height, NODE_BORDER_RADIUS); - ctx.fillStyle = color + '18'; - ctx.fill(); - ctx.strokeStyle = isHovered || isSelected || isDragging ? color : color + '60'; - ctx.lineWidth = isHovered || isSelected || isDragging ? 2 : 1; - ctx.stroke(); - - // Event type label - ctx.font = 'bold 11px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = color; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - ctx.fillText(node.label, node.x, node.y - 7); - - // Detail - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - const detailText = truncateText(ctx, node.sublabel, node.width - 20); - ctx.fillText(detailText, node.x, node.y + 8); - } else { - // Agent node - card style - const isAgentCompleted = edges.some( - (e) => (e.source as GraphNode).type === 'agent' && (e.target as GraphNode).id === node.id - ); - const accentColor = isAgentCompleted ? '#22c55e' : CUE_TEAL; - - // Background - roundRect(ctx, nx, ny, node.width, node.height, NODE_BORDER_RADIUS); - ctx.fillStyle = theme.colors.bgActivity; - ctx.fill(); - - // Border - ctx.strokeStyle = isHovered || isSelected || isDragging ? accentColor : theme.colors.border; - ctx.lineWidth = isHovered || isSelected || isDragging ? 2 : 1; - ctx.stroke(); - - // Accent bar at top - ctx.save(); - ctx.beginPath(); - ctx.moveTo(nx + NODE_BORDER_RADIUS, ny); - ctx.lineTo(nx + node.width - NODE_BORDER_RADIUS, ny); - ctx.quadraticCurveTo(nx + node.width, ny, nx + node.width, ny + NODE_BORDER_RADIUS); - ctx.lineTo(nx + node.width, ny + 4); - ctx.lineTo(nx, ny + 4); - ctx.lineTo(nx, ny + NODE_BORDER_RADIUS); - ctx.quadraticCurveTo(nx, ny, nx + NODE_BORDER_RADIUS, ny); - ctx.closePath(); - ctx.fillStyle = accentColor; - ctx.fill(); - ctx.restore(); - - // Agent name - ctx.font = 'bold 12px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textMain; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - const nameText = truncateText(ctx, node.label, node.width - 20); - ctx.fillText(nameText, node.x, node.y - 4); - - // Tool type - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - ctx.fillText(node.sublabel, node.x, node.y + 12); - - // Subscription count badge - if (node.subscriptionCount && node.subscriptionCount > 0) { - const badgeText = `${node.subscriptionCount}`; - ctx.font = 'bold 9px -apple-system, BlinkMacSystemFont, sans-serif'; - const badgeWidth = Math.max(ctx.measureText(badgeText).width + 8, 18); - const badgeX = nx + node.width - badgeWidth - 6; - const badgeY = ny + node.height - 16; - - roundRect(ctx, badgeX, badgeY, badgeWidth, 14, 7); - ctx.fillStyle = accentColor; - ctx.fill(); - - ctx.fillStyle = '#fff'; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - ctx.fillText(badgeText, badgeX + badgeWidth / 2, badgeY + 7); - } - } - } - - ctx.restore(); -} - -// ============================================================================ -// Hit Testing -// ============================================================================ - -function hitTest( - nodes: GraphNode[], - x: number, - y: number, - transform: { zoom: number; panX: number; panY: number } -): GraphNode | null { - // Convert screen coords to graph coords - const gx = (x - transform.panX) / transform.zoom; - const gy = (y - transform.panY) / transform.zoom; - - // Check nodes in reverse order (top-most first) - for (let i = nodes.length - 1; i >= 0; i--) { - const node = nodes[i]; - if (node.x === undefined || node.y === undefined) continue; - - const nx = node.x - node.width / 2; - const ny = node.y - node.height / 2; - - if (gx >= nx && gx <= nx + node.width && gy >= ny && gy <= ny + node.height) { - return node; - } - } - - return null; -} - -// ============================================================================ -// Component -// ============================================================================ - -export function CueGraphView({ theme, onClose }: CueGraphViewProps) { - const canvasRef = useRef(null); - const containerRef = useRef(null); - const [graphData, setGraphData] = useState(null); - const [loading, setLoading] = useState(true); - const [dimensions, setDimensions] = useState({ width: 800, height: 500 }); - const [hoveredNodeId, setHoveredNodeId] = useState(null); - const [selectedNodeId, setSelectedNodeId] = useState(null); - const [layoutType, setLayoutType] = useState('hierarchical'); - const [showLayoutDropdown, setShowLayoutDropdown] = useState(false); - - const transformRef = useRef({ zoom: 1, panX: 0, panY: 0 }); - const isPanningRef = useRef(false); - const lastMouseRef = useRef({ x: 0, y: 0 }); - const rafRef = useRef(0); - - // Node dragging state - const draggingNodeRef = useRef<{ - nodeId: string; - startX: number; - startY: number; - mouseX: number; - mouseY: number; - } | null>(null); - const [draggingNodeId, setDraggingNodeId] = useState(null); - const nodePositionOverrides = useRef>(new Map()); - - const sessions = useSessionStore((state) => state.sessions); - const setActiveSessionId = useSessionStore((state) => state.setActiveSessionId); - - // Fetch graph data - const fetchGraphData = useCallback(async () => { - setLoading(true); - try { - const data = await window.maestro.cue.getGraphData(); - const allSessionsSimple = sessions.map((s) => ({ - id: s.id, - name: s.name, - toolType: s.toolType, - })); - const graph = buildGraphData(data, allSessionsSimple); - layoutGraph(layoutType, graph.nodes, graph.edges, dimensions.width, dimensions.height); - // Clear position overrides on fresh data - nodePositionOverrides.current.clear(); - setGraphData(graph); - } catch { - setGraphData({ nodes: [], edges: [] }); - } finally { - setLoading(false); - } - }, [sessions, dimensions.width, dimensions.height, layoutType]); - - useEffect(() => { - fetchGraphData(); - }, [fetchGraphData]); - - // Observe container size - useEffect(() => { - const container = containerRef.current; - if (!container) return; - - const observer = new ResizeObserver((entries) => { - for (const entry of entries) { - const { width, height } = entry.contentRect; - if (width > 0 && height > 0) { - setDimensions({ width, height }); - } - } - }); - - observer.observe(container); - return () => observer.disconnect(); - }, []); - - // Re-layout when dimensions or layout type change - useEffect(() => { - if (graphData && graphData.nodes.length > 0) { - layoutGraph( - layoutType, - graphData.nodes, - graphData.edges, - dimensions.width, - dimensions.height - ); - // Apply position overrides - for (const node of graphData.nodes) { - const override = nodePositionOverrides.current.get(node.id); - if (override) { - node.x = override.x; - node.y = override.y; - } - } - // Center the transform - transformRef.current = { zoom: 1, panX: 0, panY: 0 }; - requestDraw(); - } - }, [dimensions.width, dimensions.height, graphData, layoutType]); - - // Canvas setup and rendering - const requestDraw = useCallback(() => { - if (rafRef.current) cancelAnimationFrame(rafRef.current); - rafRef.current = requestAnimationFrame(() => { - const canvas = canvasRef.current; - if (!canvas || !graphData) return; - - const ctx = canvas.getContext('2d'); - if (!ctx) return; - - renderGraph( - ctx, - graphData.nodes, - graphData.edges, - theme, - transformRef.current, - hoveredNodeId, - selectedNodeId, - draggingNodeId, - dimensions.width, - dimensions.height - ); - }); - }, [graphData, theme, hoveredNodeId, selectedNodeId, draggingNodeId, dimensions]); - - useEffect(() => { - requestDraw(); - }, [requestDraw]); - - // Set canvas size with DPR - useEffect(() => { - const canvas = canvasRef.current; - if (!canvas) return; - const dpr = window.devicePixelRatio || 1; - canvas.width = dimensions.width * dpr; - canvas.height = dimensions.height * dpr; - canvas.style.width = `${dimensions.width}px`; - canvas.style.height = `${dimensions.height}px`; - requestDraw(); - }, [dimensions, requestDraw]); - - // Close layout dropdown on click outside - useEffect(() => { - if (!showLayoutDropdown) return; - const handleClick = () => setShowLayoutDropdown(false); - document.addEventListener('click', handleClick); - return () => document.removeEventListener('click', handleClick); - }, [showLayoutDropdown]); - - // Mouse handlers - const handleMouseDown = useCallback( - (e: React.MouseEvent) => { - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - - const node = hitTest(graphData.nodes, x, y, transformRef.current); - if (node) { - setSelectedNodeId(node.id); - // Start node drag - draggingNodeRef.current = { - nodeId: node.id, - startX: node.x!, - startY: node.y!, - mouseX: e.clientX, - mouseY: e.clientY, - }; - setDraggingNodeId(node.id); - } else { - setSelectedNodeId(null); - isPanningRef.current = true; - lastMouseRef.current = { x: e.clientX, y: e.clientY }; - } - }, - [graphData] - ); - - const handleMouseMove = useCallback( - (e: React.MouseEvent) => { - if (draggingNodeRef.current && graphData) { - // Dragging a node - const { nodeId, startX, startY, mouseX, mouseY } = draggingNodeRef.current; - const zoom = transformRef.current.zoom; - const deltaX = (e.clientX - mouseX) / zoom; - const deltaY = (e.clientY - mouseY) / zoom; - const newX = startX + deltaX; - const newY = startY + deltaY; - - // Update node position - const node = graphData.nodes.find((n) => n.id === nodeId); - if (node) { - node.x = newX; - node.y = newY; - nodePositionOverrides.current.set(nodeId, { x: newX, y: newY }); - } - - if (canvasRef.current) { - canvasRef.current.style.cursor = 'grabbing'; - } - requestDraw(); - return; - } - - if (isPanningRef.current) { - const dx = e.clientX - lastMouseRef.current.x; - const dy = e.clientY - lastMouseRef.current.y; - transformRef.current.panX += dx; - transformRef.current.panY += dy; - lastMouseRef.current = { x: e.clientX, y: e.clientY }; - requestDraw(); - return; - } - - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - const node = hitTest(graphData.nodes, x, y, transformRef.current); - const newHoveredId = node?.id ?? null; - if (newHoveredId !== hoveredNodeId) { - setHoveredNodeId(newHoveredId); - } - - // Cursor style - if (canvasRef.current) { - canvasRef.current.style.cursor = node ? 'grab' : 'default'; - } - }, - [graphData, hoveredNodeId, requestDraw] - ); - - const handleMouseUp = useCallback(() => { - draggingNodeRef.current = null; - setDraggingNodeId(null); - isPanningRef.current = false; - if (canvasRef.current) { - canvasRef.current.style.cursor = hoveredNodeId ? 'grab' : 'default'; - } - }, [hoveredNodeId]); - - const handleDoubleClick = useCallback( - (e: React.MouseEvent) => { - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - const node = hitTest(graphData.nodes, x, y, transformRef.current); - - if (node?.type === 'agent' && node.sessionId) { - setActiveSessionId(node.sessionId); - onClose(); - } - }, - [graphData, setActiveSessionId, onClose] - ); - - const handleWheel = useCallback( - (e: React.WheelEvent) => { - e.preventDefault(); - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect) return; - - const mouseX = e.clientX - rect.left; - const mouseY = e.clientY - rect.top; - - const zoomFactor = e.deltaY < 0 ? 1.08 : 1 / 1.08; - const newZoom = Math.max(0.2, Math.min(3, transformRef.current.zoom * zoomFactor)); - - // Zoom toward mouse position - const scale = newZoom / transformRef.current.zoom; - transformRef.current.panX = mouseX - scale * (mouseX - transformRef.current.panX); - transformRef.current.panY = mouseY - scale * (mouseY - transformRef.current.panY); - transformRef.current.zoom = newZoom; - - requestDraw(); - }, - [requestDraw] - ); - - // Handle layout type change - const handleLayoutTypeChange = useCallback((type: CueLayoutType) => { - // Clear position overrides when changing layout - nodePositionOverrides.current.clear(); - setLayoutType(type); - setShowLayoutDropdown(false); - }, []); - - // Selected node info - const selectedNode = useMemo( - () => graphData?.nodes.find((n) => n.id === selectedNodeId) ?? null, - [graphData, selectedNodeId] - ); - - if (loading) { - return ( -
- Loading Cue graph... -
- ); - } - - if (!graphData || graphData.nodes.length === 0) { - return ( -
- - No Cue subscriptions found. Create .maestro/cue.yaml in a project to see the graph. - -
- ); - } - - return ( -
- {/* Toolbar */} -
-
- - {graphData.nodes.filter((n) => n.type === 'agent').length} agents - · - {graphData.nodes.filter((n) => n.type === 'trigger').length} triggers - · - {graphData.edges.length} connections - - - {/* Layout dropdown */} -
- - - {showLayoutDropdown && ( -
- {(['hierarchical', 'force'] as CueLayoutType[]).map((type) => ( - - ))} -
- )} -
-
-
- {selectedNode?.type === 'agent' && selectedNode.sessionId && ( - - )} - - - Double-click agent to switch focus - -
-
- - {/* Canvas */} -
- -
-
- ); -} diff --git a/src/renderer/components/CueModal.tsx b/src/renderer/components/CueModal.tsx index 6be1ebd38..7a8b6d6de 100644 --- a/src/renderer/components/CueModal.tsx +++ b/src/renderer/components/CueModal.tsx @@ -24,40 +24,15 @@ import { MODAL_PRIORITIES } from '../constants/modalPriorities'; import { useCue } from '../hooks/useCue'; import type { CueSessionStatus, CueRunResult } from '../hooks/useCue'; import { CueHelpContent } from './CueHelpModal'; -// Kept for reference - visual pipeline editor replaces this -// import { CueGraphView } from './CueGraphView'; import { CuePipelineEditor } from './CuePipelineEditor'; import { useSessionStore } from '../stores/sessionStore'; import { getModalActions } from '../stores/modalStore'; -import type { CuePipeline } from '../../shared/cue-pipeline-types'; +import type { CuePipeline, CueGraphSession } from '../../shared/cue-pipeline-types'; import { getPipelineColorForAgent } from './CuePipelineEditor/pipelineColors'; import { graphSessionsToPipelines } from './CuePipelineEditor/utils/yamlToPipeline'; type CueModalTab = 'dashboard' | 'pipeline'; -interface CueGraphSession { - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - prompt?: string; - output_prompt?: string; - interval_minutes?: number; - schedule_times?: string[]; - schedule_days?: string[]; - watch?: string; - source_session?: string | string[]; - fan_out?: string[]; - filter?: Record; - repo?: string; - poll_minutes?: number; - agent_id?: string; - }>; -} - interface CueModalProps { theme: Theme; onClose: () => void; @@ -1029,7 +1004,6 @@ export function CueModal({ theme, onClose, cueShortcutKeys }: CueModalProps) { , document.body )} - {showHelp && } ); } diff --git a/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx b/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx index bfc49986d..9a2564dc4 100644 --- a/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx +++ b/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx @@ -26,6 +26,7 @@ import type { Theme } from '../../types'; import type { CuePipelineState, CuePipeline, + CueGraphSession, PipelineNode, PipelineEdge as PipelineEdgeType, PipelineLayoutState, @@ -54,30 +55,6 @@ import { CueSettingsPanel } from './panels/CueSettingsPanel'; import type { CueSettings } from '../../../main/cue/cue-types'; import { DEFAULT_CUE_SETTINGS } from '../../../main/cue/cue-types'; -interface CueGraphSession { - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - prompt?: string; - output_prompt?: string; - interval_minutes?: number; - schedule_times?: string[]; - schedule_days?: string[]; - watch?: string; - source_session?: string | string[]; - fan_out?: string[]; - filter?: Record; - repo?: string; - poll_minutes?: number; - agent_id?: string; - label?: string; - }>; -} - interface SessionInfo { id: string; groupId?: string; diff --git a/src/shared/cue-pipeline-types.ts b/src/shared/cue-pipeline-types.ts index 8000f040d..4c4b7c80c 100644 --- a/src/shared/cue-pipeline-types.ts +++ b/src/shared/cue-pipeline-types.ts @@ -114,6 +114,34 @@ export interface PipelineLayoutState { viewport?: PipelineViewport; } +/** Session data with subscriptions for the Cue graph/pipeline visualization (renderer-safe mirror of cue-types.ts CueGraphSession) */ +export interface CueGraphSession { + sessionId: string; + sessionName: string; + toolType: string; + subscriptions: Array<{ + name: string; + event: string; + enabled: boolean; + prompt?: string; + prompt_file?: string; + output_prompt?: string; + output_prompt_file?: string; + interval_minutes?: number; + schedule_times?: string[]; + schedule_days?: string[]; + watch?: string; + source_session?: string | string[]; + fan_out?: string[]; + filter?: Record; + repo?: string; + poll_minutes?: number; + gh_state?: string; + agent_id?: string; + label?: string; + }>; +} + /** Returns the first unused color from the palette, cycling if all used. */ export function getNextPipelineColor(existingPipelines: CuePipeline[]): string { const usedColors = new Set(existingPipelines.map((p) => p.color)); From ad43e04a0fe70b69b3663390866a1256590264b0 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 18:29:04 -0600 Subject: [PATCH 2/7] ci: added rc branch to workflows --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ca2171221..92b170c39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + - rc - '*-RC' push: branches: [main] From cea731907ef88d55937673ca7998f6360a2dc8bc Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 18:48:57 -0600 Subject: [PATCH 3/7] fix(cue): free concurrency slot on stopRun, refresh nextTriggers after scheduled fire - stopRun now eagerly decrements activeRunCount and drains the queue, so queued events dispatch immediately instead of waiting for the async finally block (which may never run if onCueRun hangs) - finally block skips its decrement for manually stopped runs to prevent double-decrement race with newly drained runs - setupScheduledSubscription now recalculates nextTriggers after each fire so the UI shows the next future time, not a stale past one - stopAll uses snapshot of keys to avoid mutating Map during iteration - Added regression tests for both fixes (477 total cue tests passing) --- .../main/cue/cue-concurrency.test.ts | 47 +++++++++++++++++++ src/__tests__/main/cue/cue-engine.test.ts | 41 ++++++++++++++++ src/main/cue/cue-run-manager.ts | 22 ++++++--- src/main/cue/cue-subscription-setup.ts | 7 +++ 4 files changed, 111 insertions(+), 6 deletions(-) diff --git a/src/__tests__/main/cue/cue-concurrency.test.ts b/src/__tests__/main/cue/cue-concurrency.test.ts index 03faa475e..6366f945f 100644 --- a/src/__tests__/main/cue/cue-concurrency.test.ts +++ b/src/__tests__/main/cue/cue-concurrency.test.ts @@ -457,6 +457,53 @@ describe('CueEngine Concurrency Control', () => { }); }); + describe('stopRun concurrency slot release', () => { + it('stopRun frees the concurrency slot so queued events dispatch immediately', async () => { + const deps = createMockDeps({ + onCueRun: vi.fn(() => new Promise(() => {})), // Never resolves + }); + const config = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + // First run starts immediately + await vi.advanceTimersByTimeAsync(10); + expect(engine.getActiveRuns()).toHaveLength(1); + + // Second event gets queued (max_concurrent = 1) + vi.advanceTimersByTime(1 * 60 * 1000); + expect(engine.getQueueStatus().get('session-1')).toBe(1); + + // Stop the active run — should free the slot and drain the queue + const activeRun = engine.getActiveRuns()[0]; + engine.stopRun(activeRun.runId); + + // The queued event should have been dispatched (onCueRun called again) + expect(deps.onCueRun).toHaveBeenCalledTimes(2); + expect(engine.getQueueStatus().size).toBe(0); + + engine.stopAll(); + engine.stop(); + }); + }); + describe('clearQueue', () => { it('clears queued events for a specific session', async () => { const deps = createMockDeps({ diff --git a/src/__tests__/main/cue/cue-engine.test.ts b/src/__tests__/main/cue/cue-engine.test.ts index 669f99610..fedfeb388 100644 --- a/src/__tests__/main/cue/cue-engine.test.ts +++ b/src/__tests__/main/cue/cue-engine.test.ts @@ -1888,6 +1888,47 @@ describe('CueEngine', () => { engine.stop(); }); + it('refreshes nextTriggers after time.scheduled fires', async () => { + // Monday 2026-03-09 at 08:59 — next trigger should be 09:00 today + vi.setSystemTime(new Date('2026-03-09T08:59:00')); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'refresh-schedule', + event: 'time.scheduled', + enabled: true, + prompt: 'check', + schedule_times: ['09:00'], + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + const statusBefore = engine.getStatus(); + const subBefore = statusBefore.find((s) => s.sessionId === 'session-1'); + const nextBefore = subBefore!.nextTrigger!; + // nextTrigger should be pointing at 09:00 today (ISO string) + const nextBeforeDate = new Date(nextBefore); + expect(nextBeforeDate.getHours()).toBe(9); + expect(nextBeforeDate.getMinutes()).toBe(0); + + // Advance to 09:00 — the subscription fires + vi.advanceTimersByTime(60_000); + await vi.advanceTimersByTimeAsync(10); + + // After firing, nextTrigger should have advanced to a future time (tomorrow 09:00) + const statusAfter = engine.getStatus(); + const subAfter = statusAfter.find((s) => s.sessionId === 'session-1'); + expect(subAfter!.nextTrigger).toBeDefined(); + expect(new Date(subAfter!.nextTrigger!).getTime()).toBeGreaterThan(nextBeforeDate.getTime()); + + engine.stop(); + }); + it('uses prompt_file when configured', async () => { // Monday at 08:59 — fires at 09:00 vi.setSystemTime(new Date('2026-03-09T08:59:00')); diff --git a/src/main/cue/cue-run-manager.ts b/src/main/cue/cue-run-manager.ts index b630afe03..31948c51a 100644 --- a/src/main/cue/cue-run-manager.ts +++ b/src/main/cue/cue-run-manager.ts @@ -268,12 +268,15 @@ export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { result.durationMs = Date.now() - new Date(result.startedAt).getTime(); activeRuns.delete(runId); - // Decrement active run count and drain queue - const count = activeRunCount.get(sessionId) ?? 1; - activeRunCount.set(sessionId, Math.max(0, count - 1)); - drainQueue(sessionId); - const wasManuallyStopped = manuallyStoppedRuns.has(runId); + + // Only decrement here for non-stopped runs — stopRun already decremented eagerly + if (!wasManuallyStopped) { + const count = activeRunCount.get(sessionId) ?? 1; + activeRunCount.set(sessionId, Math.max(0, count - 1)); + drainQueue(sessionId); + } + if (wasManuallyStopped) { try { updateCueEventStatus(runId, 'stopped'); @@ -363,6 +366,13 @@ export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime(); activeRuns.delete(runId); + + // Free the concurrency slot immediately so queued events can proceed. + // The finally block in doExecuteCueRun skips its decrement for manually stopped runs. + const count = activeRunCount.get(run.result.sessionId) ?? 1; + activeRunCount.set(run.result.sessionId, Math.max(0, count - 1)); + drainQueue(run.result.sessionId); + deps.onRunStopped(run.result); deps.onLog('cue', `[CUE] Run stopped: ${runId}`, { type: 'runStopped', @@ -374,7 +384,7 @@ export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { }, stopAll(): void { - for (const [runId] of activeRuns) { + for (const runId of [...activeRuns.keys()]) { this.stopRun(runId); } eventQueue.clear(); diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts index e52867507..32d897b01 100644 --- a/src/main/cue/cue-subscription-setup.ts +++ b/src/main/cue/cue-subscription-setup.ts @@ -219,6 +219,13 @@ export function setupScheduledSubscription( deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); state.lastTriggered = event.timestamp; + + // Refresh next trigger time so the UI shows a future time, not this past one + const nextMs = calculateNextScheduledTime(times, sub.schedule_days); + if (nextMs != null) { + state.nextTriggers.set(sub.name, nextMs); + } + deps.executeCueRun( session.id, sub.prompt_file ?? sub.prompt, From e751a31536b15da083c075ec427e945c270fdaa2 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 18:57:09 -0600 Subject: [PATCH 4/7] fix(cue): route reconciler through dispatchSubscription, harden fan-in and types - Heartbeat onDispatch now calls dispatchSubscription instead of runManager.execute directly, so fan-out works for reconciled events - Added getActiveRunCount(sessionId) to CueRunManager, used by getStatus() instead of reaching into getActiveRunMap() internals - Guard Math.max in fan-in-tracker against empty completions array to avoid -Infinity chainDepth propagation - CueGraphSession.event typed as CueEventType instead of string --- src/main/cue/cue-engine.ts | 12 ++---------- src/main/cue/cue-fan-in-tracker.ts | 6 ++++-- src/main/cue/cue-run-manager.ts | 5 +++++ src/shared/cue-pipeline-types.ts | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/main/cue/cue-engine.ts b/src/main/cue/cue-engine.ts index c9a86a08b..7ca5634e7 100644 --- a/src/main/cue/cue-engine.ts +++ b/src/main/cue/cue-engine.ts @@ -131,13 +131,7 @@ export class CueEngine { return result; }, onDispatch: (sessionId, sub, event) => { - this.runManager.execute( - sessionId, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); + this.dispatchSubscription(sessionId, sub, event, sessionId); }, }); } @@ -269,9 +263,7 @@ export class CueEngine { reportedSessionIds.add(sessionId); - const activeRunCount = [...this.runManager.getActiveRunMap().values()].filter( - (r) => r.result.sessionId === sessionId - ).length; + const activeRunCount = this.runManager.getActiveRunCount(sessionId); let nextTrigger: string | undefined; if (state.nextTriggers.size > 0) { diff --git a/src/main/cue/cue-fan-in-tracker.ts b/src/main/cue/cue-fan-in-tracker.ts index 4401011a8..d59ad892a 100644 --- a/src/main/cue/cue-fan-in-tracker.ts +++ b/src/main/cue/cue-fan-in-tracker.ts @@ -94,7 +94,8 @@ export function createCueFanInTracker(deps: CueFanInDeps): CueFanInTracker { partial: true, }, }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); + const maxChainDepth = + completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0; deps.onLog( 'cue', `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` @@ -177,7 +178,8 @@ export function createCueFanInTracker(deps: CueFanInDeps): CueFanInTracker { outputTruncated: completions.some((c) => c.truncated), }, }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); + const maxChainDepth = + completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0; deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); deps.dispatchSubscription( ownerSessionId, diff --git a/src/main/cue/cue-run-manager.ts b/src/main/cue/cue-run-manager.ts index 31948c51a..8f6c9918e 100644 --- a/src/main/cue/cue-run-manager.ts +++ b/src/main/cue/cue-run-manager.ts @@ -69,6 +69,7 @@ export interface CueRunManager { stopRun(runId: string): boolean; stopAll(): void; getActiveRuns(): CueRunResult[]; + getActiveRunCount(sessionId: string): number; getActiveRunMap(): Map; getQueueStatus(): Map; clearQueue(sessionId: string): void; @@ -394,6 +395,10 @@ export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { return [...activeRuns.values()].map((r) => r.result); }, + getActiveRunCount(sessionId: string): number { + return [...activeRuns.values()].filter((r) => r.result.sessionId === sessionId).length; + }, + getActiveRunMap(): Map { return activeRuns; }, diff --git a/src/shared/cue-pipeline-types.ts b/src/shared/cue-pipeline-types.ts index 4c4b7c80c..33c35cada 100644 --- a/src/shared/cue-pipeline-types.ts +++ b/src/shared/cue-pipeline-types.ts @@ -121,7 +121,7 @@ export interface CueGraphSession { toolType: string; subscriptions: Array<{ name: string; - event: string; + event: CueEventType; enabled: boolean; prompt?: string; prompt_file?: string; From 1175e1675c6d5c7a86ed748ddc5d4ea7d90d3620 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 19:24:15 -0600 Subject: [PATCH 5/7] fix(cue): refresh nextTriggers before filter check in scheduled subscriptions Move calculateNextScheduledTime call above the filter gate so the UI's "Next" column updates even when the filter suppresses the event. Previously nextTriggers would go stale on filtered-out scheduled fires. --- src/main/cue/cue-subscription-setup.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts index 32d897b01..d25c418b4 100644 --- a/src/main/cue/cue-subscription-setup.ts +++ b/src/main/cue/cue-subscription-setup.ts @@ -212,6 +212,12 @@ export function setupScheduledSubscription( }, }; + // Refresh next trigger time regardless of filter outcome so the UI stays current + const nextMs = calculateNextScheduledTime(times, sub.schedule_days); + if (nextMs != null) { + state.nextTriggers.set(sub.name, nextMs); + } + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); return; @@ -219,13 +225,6 @@ export function setupScheduledSubscription( deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); state.lastTriggered = event.timestamp; - - // Refresh next trigger time so the UI shows a future time, not this past one - const nextMs = calculateNextScheduledTime(times, sub.schedule_days); - if (nextMs != null) { - state.nextTriggers.set(sub.name, nextMs); - } - deps.executeCueRun( session.id, sub.prompt_file ?? sub.prompt, From 67887ed91c0e9af217521a8167bcfb1990a3fcd2 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 19:33:01 -0600 Subject: [PATCH 6/7] fix(cue): set lastTriggered on initial heartbeat fire The immediate heartbeat dispatch was missing the state.lastTriggered update that the interval branch already performs, leaving the Cue modal's "Last Triggered" column empty until the first interval tick. --- src/main/cue/cue-subscription-setup.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts index d25c418b4..11631caff 100644 --- a/src/main/cue/cue-subscription-setup.ts +++ b/src/main/cue/cue-subscription-setup.ts @@ -105,6 +105,7 @@ export function setupHeartbeatSubscription( // Check payload filter (even for timer events) if (!sub.filter || matchesFilter(immediateEvent.payload, sub.filter)) { deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat, initial)`); + state.lastTriggered = immediateEvent.timestamp; deps.executeCueRun( session.id, sub.prompt_file ?? sub.prompt, From d85290c514c32e7dd9d618c3ca4bcf5ddcecb305 Mon Sep 17 00:00:00 2001 From: Raza Rauf Date: Tue, 17 Mar 2026 20:08:14 -0600 Subject: [PATCH 7/7] fix(cue): extend calculateNextScheduledTime to cover same-day-next-week Loop bound was dayOffset < 7 (0..6), missing the 7th day offset. When a single-day schedule's slot had already passed (e.g. Mon 09:00 checked at Mon 09:01), the function returned null because next Monday was at offset 7. Changed to <= 7 so weekly schedules always resolve. --- src/main/cue/cue-subscription-setup.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts index 11631caff..67411bea6 100644 --- a/src/main/cue/cue-subscription-setup.ts +++ b/src/main/cue/cue-subscription-setup.ts @@ -29,8 +29,8 @@ export function calculateNextScheduledTime(times: string[], days?: string[]): nu const now = new Date(); const candidates: number[] = []; - // Check up to 7 days ahead to find the next match - for (let dayOffset = 0; dayOffset < 7; dayOffset++) { + // Check up to 8 days ahead (0..7) to cover same-day-next-week when today's slot has passed + for (let dayOffset = 0; dayOffset <= 7; dayOffset++) { const candidate = new Date(now); candidate.setDate(candidate.getDate() + dayOffset); const dayName = DAY_NAMES[candidate.getDay()];