diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ca2171221..92b170c39 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,7 @@ on: pull_request: branches: - main + - rc - '*-RC' push: branches: [main] diff --git a/src/__tests__/main/cue/cue-activity-log.test.ts b/src/__tests__/main/cue/cue-activity-log.test.ts new file mode 100644 index 000000000..cb870a670 --- /dev/null +++ b/src/__tests__/main/cue/cue-activity-log.test.ts @@ -0,0 +1,72 @@ +/** + * Tests for the Cue activity log ring buffer. + */ + +import { describe, it, expect } from 'vitest'; +import { createCueActivityLog } from '../../../main/cue/cue-activity-log'; +import type { CueRunResult } from '../../../main/cue/cue-types'; + +function makeResult(id: string): CueRunResult { + return { + runId: id, + sessionId: 'session-1', + sessionName: 'Test', + subscriptionName: 'sub', + event: { id: 'e1', type: 'time.heartbeat', timestamp: '', triggerName: 'sub', payload: {} }, + status: 'completed', + stdout: '', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: '', + endedAt: '', + }; +} + +describe('createCueActivityLog', () => { + it('stores and retrieves results', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + expect(log.getAll()).toHaveLength(2); + expect(log.getAll()[0].runId).toBe('r1'); + }); + + it('respects limit parameter on getAll', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + log.push(makeResult('r3')); + const last2 = log.getAll(2); + expect(last2).toHaveLength(2); + expect(last2[0].runId).toBe('r2'); + expect(last2[1].runId).toBe('r3'); + }); + + it('evicts oldest entries when exceeding maxSize', () => { + const log = createCueActivityLog(3); + log.push(makeResult('r1')); + log.push(makeResult('r2')); + log.push(makeResult('r3')); + log.push(makeResult('r4')); + const all = log.getAll(); + expect(all).toHaveLength(3); + expect(all[0].runId).toBe('r2'); + expect(all[2].runId).toBe('r4'); + }); + + it('clear empties the log', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + log.clear(); + expect(log.getAll()).toHaveLength(0); + }); + + it('returns a copy from getAll, not a reference', () => { + const log = createCueActivityLog(); + log.push(makeResult('r1')); + const snapshot = log.getAll(); + log.push(makeResult('r2')); + expect(snapshot).toHaveLength(1); + }); +}); diff --git a/src/__tests__/main/cue/cue-completion-chains.test.ts b/src/__tests__/main/cue/cue-completion-chains.test.ts index cdbd9e1f5..a35c75255 100644 --- a/src/__tests__/main/cue/cue-completion-chains.test.ts +++ b/src/__tests__/main/cue/cue-completion-chains.test.ts @@ -14,7 +14,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -52,47 +51,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine completion chains', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-concurrency.test.ts b/src/__tests__/main/cue/cue-concurrency.test.ts index 758e1aee2..6366f945f 100644 --- a/src/__tests__/main/cue/cue-concurrency.test.ts +++ b/src/__tests__/main/cue/cue-concurrency.test.ts @@ -13,7 +13,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -35,52 +34,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { - timeout_minutes: 30, - timeout_on_fail: 'break', - max_concurrent: 1, - queue_size: 10, - }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine Concurrency Control', () => { let yamlWatcherCleanup: ReturnType; @@ -503,6 +457,53 @@ describe('CueEngine Concurrency Control', () => { }); }); + describe('stopRun concurrency slot release', () => { + it('stopRun frees the concurrency slot so queued events dispatch immediately', async () => { + const deps = createMockDeps({ + onCueRun: vi.fn(() => new Promise(() => {})), // Never resolves + }); + const config = createMockConfig({ + settings: { + timeout_minutes: 30, + timeout_on_fail: 'break', + max_concurrent: 1, + queue_size: 10, + }, + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 1, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + // First run starts immediately + await vi.advanceTimersByTimeAsync(10); + expect(engine.getActiveRuns()).toHaveLength(1); + + // Second event gets queued (max_concurrent = 1) + vi.advanceTimersByTime(1 * 60 * 1000); + expect(engine.getQueueStatus().get('session-1')).toBe(1); + + // Stop the active run — should free the slot and drain the queue + const activeRun = engine.getActiveRuns()[0]; + engine.stopRun(activeRun.runId); + + // The queued event should have been dispatched (onCueRun called again) + expect(deps.onCueRun).toHaveBeenCalledTimes(2); + expect(engine.getQueueStatus().size).toBe(0); + + engine.stopAll(); + engine.stop(); + }); + }); + describe('clearQueue', () => { it('clears queued events for a specific session', async () => { const deps = createMockDeps({ diff --git a/src/__tests__/main/cue/cue-engine.test.ts b/src/__tests__/main/cue/cue-engine.test.ts index 225399e4e..fedfeb388 100644 --- a/src/__tests__/main/cue/cue-engine.test.ts +++ b/src/__tests__/main/cue/cue-engine.test.ts @@ -15,7 +15,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -53,48 +52,7 @@ import { calculateNextScheduledTime, type CueEngineDeps, } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async (request: Parameters[0]) => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: request.subscriptionName, - event: request.event, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onStopCueRun: vi.fn(() => true), - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine', () => { let yamlWatcherCleanup: ReturnType; @@ -930,6 +888,39 @@ describe('CueEngine', () => { engine.stop(); }); + it('stopRun adds the stopped run to the activity log', async () => { + const deps = createMockDeps({ + onCueRun: vi.fn(() => new Promise(() => {})), + }); + const config = createMockConfig({ + subscriptions: [ + { + name: 'timer', + event: 'time.heartbeat', + enabled: true, + prompt: 'test', + interval_minutes: 60, + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const engine = new CueEngine(deps); + engine.start(); + + await vi.advanceTimersByTimeAsync(10); + + const activeRun = engine.getActiveRuns()[0]; + expect(activeRun).toBeDefined(); + engine.stopRun(activeRun.runId); + + const log = engine.getActivityLog(); + expect(log).toHaveLength(1); + expect(log[0].runId).toBe(activeRun.runId); + expect(log[0].status).toBe('stopped'); + + engine.stop(); + }); + it('stopAll clears all active runs', async () => { // Use a slow-resolving onCueRun to keep runs active const deps = createMockDeps({ @@ -1897,6 +1888,47 @@ describe('CueEngine', () => { engine.stop(); }); + it('refreshes nextTriggers after time.scheduled fires', async () => { + // Monday 2026-03-09 at 08:59 — next trigger should be 09:00 today + vi.setSystemTime(new Date('2026-03-09T08:59:00')); + + const config = createMockConfig({ + subscriptions: [ + { + name: 'refresh-schedule', + event: 'time.scheduled', + enabled: true, + prompt: 'check', + schedule_times: ['09:00'], + }, + ], + }); + mockLoadCueConfig.mockReturnValue(config); + const deps = createMockDeps(); + const engine = new CueEngine(deps); + engine.start(); + + const statusBefore = engine.getStatus(); + const subBefore = statusBefore.find((s) => s.sessionId === 'session-1'); + const nextBefore = subBefore!.nextTrigger!; + // nextTrigger should be pointing at 09:00 today (ISO string) + const nextBeforeDate = new Date(nextBefore); + expect(nextBeforeDate.getHours()).toBe(9); + expect(nextBeforeDate.getMinutes()).toBe(0); + + // Advance to 09:00 — the subscription fires + vi.advanceTimersByTime(60_000); + await vi.advanceTimersByTimeAsync(10); + + // After firing, nextTrigger should have advanced to a future time (tomorrow 09:00) + const statusAfter = engine.getStatus(); + const subAfter = statusAfter.find((s) => s.sessionId === 'session-1'); + expect(subAfter!.nextTrigger).toBeDefined(); + expect(new Date(subAfter!.nextTrigger!).getTime()).toBeGreaterThan(nextBeforeDate.getTime()); + + engine.stop(); + }); + it('uses prompt_file when configured', async () => { // Monday at 08:59 — fires at 09:00 vi.setSystemTime(new Date('2026-03-09T08:59:00')); diff --git a/src/__tests__/main/cue/cue-multi-hop-chains.test.ts b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts index ad9c82eb9..170f07d70 100644 --- a/src/__tests__/main/cue/cue-multi-hop-chains.test.ts +++ b/src/__tests__/main/cue/cue-multi-hop-chains.test.ts @@ -12,7 +12,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -50,47 +49,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine multi-hop completion chains', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-session-lifecycle.test.ts b/src/__tests__/main/cue/cue-session-lifecycle.test.ts index 185a422eb..916c59405 100644 --- a/src/__tests__/main/cue/cue-session-lifecycle.test.ts +++ b/src/__tests__/main/cue/cue-session-lifecycle.test.ts @@ -12,7 +12,6 @@ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; // Mock the yaml loader const mockLoadCueConfig = vi.fn<(projectRoot: string) => CueConfig | null>(); @@ -50,47 +49,7 @@ vi.mock('crypto', () => ({ })); import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} - -function createMockConfig(overrides: Partial = {}): CueConfig { - return { - subscriptions: [], - settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, - ...overrides, - }; -} - -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })) as CueEngineDeps['onCueRun'], - onLog: vi.fn(), - ...overrides, - }; -} +import { createMockSession, createMockConfig, createMockDeps } from './cue-test-helpers'; describe('CueEngine session lifecycle', () => { beforeEach(() => { diff --git a/src/__tests__/main/cue/cue-sleep-wake.test.ts b/src/__tests__/main/cue/cue-sleep-wake.test.ts index 022582455..00f47ad6e 100644 --- a/src/__tests__/main/cue/cue-sleep-wake.test.ts +++ b/src/__tests__/main/cue/cue-sleep-wake.test.ts @@ -10,8 +10,7 @@ */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; -import type { SessionInfo } from '../../../shared/types'; +import type { CueConfig } from '../../../main/cue/cue-types'; // Track cue-db calls const mockInitCueDb = vi.fn(); @@ -52,19 +51,10 @@ vi.mock('crypto', () => ({ randomUUID: vi.fn(() => `uuid-${Math.random().toString(36).slice(2, 8)}`), })); -import { CueEngine, type CueEngineDeps } from '../../../main/cue/cue-engine'; - -function createMockSession(overrides: Partial = {}): SessionInfo { - return { - id: 'session-1', - name: 'Test Session', - toolType: 'claude-code', - cwd: '/projects/test', - projectRoot: '/projects/test', - ...overrides, - }; -} +import { CueEngine } from '../../../main/cue/cue-engine'; +import { createMockDeps } from './cue-test-helpers'; +/** Sleep-wake tests need a config with a default timer subscription */ function createMockConfig(overrides: Partial = {}): CueConfig { return { subscriptions: [ @@ -81,28 +71,6 @@ function createMockConfig(overrides: Partial = {}): CueConfig { }; } -function createMockDeps(overrides: Partial = {}): CueEngineDeps { - return { - getSessions: vi.fn(() => [createMockSession()]), - onCueRun: vi.fn(async () => ({ - runId: 'run-1', - sessionId: 'session-1', - sessionName: 'Test Session', - subscriptionName: 'test', - event: {} as CueEvent, - status: 'completed' as const, - stdout: 'output', - stderr: '', - exitCode: 0, - durationMs: 100, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - })), - onLog: vi.fn(), - ...overrides, - }; -} - describe('CueEngine sleep/wake detection', () => { beforeEach(() => { vi.clearAllMocks(); diff --git a/src/__tests__/main/cue/cue-test-helpers.ts b/src/__tests__/main/cue/cue-test-helpers.ts new file mode 100644 index 000000000..ddf4b0f01 --- /dev/null +++ b/src/__tests__/main/cue/cue-test-helpers.ts @@ -0,0 +1,54 @@ +/** + * Shared test factories for Cue engine tests. + * + * Provides createMockSession, createMockConfig, and createMockDeps + * used across 6+ Cue test files. Centralizes the factory functions + * to avoid duplication and ensure consistent defaults. + */ + +import { vi } from 'vitest'; +import type { CueConfig, CueEvent, CueRunResult } from '../../../main/cue/cue-types'; +import type { SessionInfo } from '../../../shared/types'; +import type { CueEngineDeps } from '../../../main/cue/cue-engine'; + +export function createMockSession(overrides: Partial = {}): SessionInfo { + return { + id: 'session-1', + name: 'Test Session', + toolType: 'claude-code', + cwd: '/projects/test', + projectRoot: '/projects/test', + ...overrides, + }; +} + +export function createMockConfig(overrides: Partial = {}): CueConfig { + return { + subscriptions: [], + settings: { timeout_minutes: 30, timeout_on_fail: 'break', max_concurrent: 1, queue_size: 10 }, + ...overrides, + }; +} + +export function createMockDeps(overrides: Partial = {}): CueEngineDeps { + return { + getSessions: vi.fn(() => [createMockSession()]), + onCueRun: vi.fn(async (request: Parameters[0]) => ({ + runId: 'run-1', + sessionId: 'session-1', + sessionName: 'Test Session', + subscriptionName: request.subscriptionName, + event: request.event, + status: 'completed' as const, + stdout: 'output', + stderr: '', + exitCode: 0, + durationMs: 100, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + })), + onStopCueRun: vi.fn(() => true), + onLog: vi.fn(), + ...overrides, + }; +} diff --git a/src/__tests__/renderer/components/CueModal.test.tsx b/src/__tests__/renderer/components/CueModal.test.tsx index 012281746..6b4ee2ced 100644 --- a/src/__tests__/renderer/components/CueModal.test.tsx +++ b/src/__tests__/renderer/components/CueModal.test.tsx @@ -41,11 +41,6 @@ vi.mock('../../../renderer/components/CueYamlEditor', () => ({ isOpen ?
YAML Editor Mock
: null, })); -// Mock CueGraphView (kept for reference - replaced by CuePipelineEditor) -vi.mock('../../../renderer/components/CueGraphView', () => ({ - CueGraphView: () => null, -})); - // Capture the onDirtyChange callback from CuePipelineEditor let capturedOnDirtyChange: ((isDirty: boolean) => void) | undefined; diff --git a/src/main/cue/cue-activity-log.ts b/src/main/cue/cue-activity-log.ts new file mode 100644 index 000000000..394b37f29 --- /dev/null +++ b/src/main/cue/cue-activity-log.ts @@ -0,0 +1,40 @@ +/** + * In-memory ring buffer of completed Cue run results. + * + * Keeps the most recent N results for the activity log view + * in the Cue Modal dashboard. + */ + +import type { CueRunResult } from './cue-types'; + +const ACTIVITY_LOG_MAX = 500; + +export interface CueActivityLog { + push(result: CueRunResult): void; + getAll(limit?: number): CueRunResult[]; + clear(): void; +} + +export function createCueActivityLog(maxSize: number = ACTIVITY_LOG_MAX): CueActivityLog { + let log: CueRunResult[] = []; + + return { + push(result: CueRunResult): void { + log.push(result); + if (log.length > maxSize) { + log = log.slice(-maxSize); + } + }, + + getAll(limit?: number): CueRunResult[] { + if (limit !== undefined) { + return log.slice(-limit); + } + return [...log]; + }, + + clear(): void { + log = []; + }, + }; +} diff --git a/src/main/cue/cue-engine.ts b/src/main/cue/cue-engine.ts index 91ac3d8b4..7ca5634e7 100644 --- a/src/main/cue/cue-engine.ts +++ b/src/main/cue/cue-engine.ts @@ -25,67 +25,28 @@ import type { } from './cue-types'; import { DEFAULT_CUE_SETTINGS } from './cue-types'; import { loadCueConfig, watchCueYaml } from './cue-yaml-loader'; -import { createCueFileWatcher } from './cue-file-watcher'; -import { createCueGitHubPoller } from './cue-github-poller'; -import { createCueTaskScanner } from './cue-task-scanner'; import { matchesFilter, describeFilter } from './cue-filter'; import { - initCueDb, - closeCueDb, - updateHeartbeat, - getLastHeartbeat, - pruneCueEvents, - recordCueEvent, - updateCueEventStatus, -} from './cue-db'; -import { reconcileMissedTimeEvents } from './cue-reconciler'; -import type { ReconcileSessionInfo } from './cue-reconciler'; - -const ACTIVITY_LOG_MAX = 500; -const DEFAULT_FILE_DEBOUNCE_MS = 5000; -const SOURCE_OUTPUT_MAX_CHARS = 5000; -const HEARTBEAT_INTERVAL_MS = 30_000; // 30 seconds -const SLEEP_THRESHOLD_MS = 120_000; // 2 minutes -const EVENT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + setupHeartbeatSubscription, + setupScheduledSubscription, + setupFileWatcherSubscription, + setupGitHubPollerSubscription, + setupTaskScannerSubscription, + type SubscriptionSetupDeps, +} from './cue-subscription-setup'; +import { initCueDb, closeCueDb, pruneCueEvents } from './cue-db'; +import { createCueActivityLog } from './cue-activity-log'; +import type { CueActivityLog } from './cue-activity-log'; +import { createCueHeartbeat, EVENT_PRUNE_AGE_MS } from './cue-heartbeat'; +import type { CueHeartbeat } from './cue-heartbeat'; +import { createCueFanInTracker, SOURCE_OUTPUT_MAX_CHARS } from './cue-fan-in-tracker'; +import type { CueFanInTracker } from './cue-fan-in-tracker'; +import { createCueRunManager } from './cue-run-manager'; +import type { CueRunManager } from './cue-run-manager'; const MAX_CHAIN_DEPTH = 10; -const DAY_NAMES = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'] as const; - -/** - * Calculates the next occurrence of a scheduled time. - * Returns a timestamp in ms, or null if inputs are invalid. - */ -export function calculateNextScheduledTime(times: string[], days?: string[]): number | null { - if (times.length === 0) return null; - - const now = new Date(); - const candidates: number[] = []; - - // Check up to 7 days ahead to find the next match - for (let dayOffset = 0; dayOffset < 7; dayOffset++) { - const candidate = new Date(now); - candidate.setDate(candidate.getDate() + dayOffset); - const dayName = DAY_NAMES[candidate.getDay()]; - - if (days && days.length > 0 && !days.includes(dayName)) continue; - - for (const time of times) { - const [hourStr, minStr] = time.split(':'); - const hour = parseInt(hourStr, 10); - const min = parseInt(minStr, 10); - if (isNaN(hour) || isNaN(min)) continue; - - const target = new Date(candidate); - target.setHours(hour, min, 0, 0); - - if (target.getTime() > now.getTime()) { - candidates.push(target.getTime()); - } - } - } - - return candidates.length > 0 ? Math.min(...candidates) : null; -} +// Re-export for backwards compat (tests import from cue-engine) +export { calculateNextScheduledTime } from './cue-subscription-setup'; /** Dependencies injected into the CueEngine */ export interface CueEngineDeps { @@ -112,50 +73,67 @@ interface SessionState { nextTriggers: Map; // subscriptionName -> next trigger timestamp } -/** Active run tracking */ -interface ActiveRun { - result: CueRunResult; - abortController?: AbortController; -} - -/** Stored data for a single fan-in source completion */ -interface FanInSourceCompletion { - sessionId: string; - sessionName: string; - output: string; - truncated: boolean; - chainDepth: number; -} - -/** A queued event waiting for a concurrency slot */ -interface QueuedEvent { - event: CueEvent; - subscription: CueSubscription; - prompt: string; - outputPrompt?: string; - subscriptionName: string; - queuedAt: number; - chainDepth?: number; -} - export class CueEngine { private enabled = false; private sessions = new Map(); - private activeRuns = new Map(); - private activityLog: CueRunResult[] = []; - private fanInTrackers = new Map>(); - private fanInTimers = new Map>(); + private activityLog: CueActivityLog = createCueActivityLog(); + private fanInTracker!: CueFanInTracker; + private runManager!: CueRunManager; private pendingYamlWatchers = new Map void>(); - private activeRunCount = new Map(); - private eventQueue = new Map(); - private manuallyStoppedRuns = new Set(); /** Tracks "subName:HH:MM" keys that time.scheduled already fired, preventing double-fire on config refresh */ private scheduledFiredKeys = new Set(); - private heartbeatInterval: ReturnType | null = null; + private heartbeat: CueHeartbeat; private deps: CueEngineDeps; constructor(deps: CueEngineDeps) { this.deps = deps; + this.runManager = createCueRunManager({ + getSessions: deps.getSessions, + getSessionSettings: (sessionId) => this.sessions.get(sessionId)?.config.settings, + onCueRun: deps.onCueRun, + onStopCueRun: deps.onStopCueRun, + onLog: deps.onLog, + onRunCompleted: (sessionId, result, subscriptionName, chainDepth) => { + this.pushActivityLog(result); + this.notifyAgentCompleted(sessionId, { + sessionName: result.sessionName, + status: result.status, + exitCode: result.exitCode, + durationMs: result.durationMs, + stdout: result.stdout, + triggeredBy: subscriptionName, + chainDepth: (chainDepth ?? 0) + 1, + }); + }, + onRunStopped: (result) => { + this.pushActivityLog(result); + }, + }); + this.fanInTracker = createCueFanInTracker({ + onLog: deps.onLog, + getSessions: deps.getSessions, + dispatchSubscription: (ownerSessionId, sub, event, sourceSessionName, chainDepth) => { + this.dispatchSubscription(ownerSessionId, sub, event, sourceSessionName, chainDepth); + }, + }); + this.heartbeat = createCueHeartbeat({ + onLog: deps.onLog, + getSessions: () => { + const result = new Map(); + const allSessions = deps.getSessions(); + for (const [sessionId, state] of this.sessions) { + const session = allSessions.find((s) => s.id === sessionId); + result.set(sessionId, { + config: state.config, + sessionName: session?.name ?? sessionId, + }); + } + return result; + }, + onDispatch: (sessionId, sub, event) => { + this.dispatchSubscription(sessionId, sub, event, sessionId); + }, + }); } /** Enable the engine and scan all sessions for Cue configs */ @@ -179,10 +157,10 @@ export class CueEngine { } // Detect sleep gap from previous heartbeat - this.detectSleepAndReconcile(); + this.heartbeat.detectSleepAndReconcile(); // Start heartbeat writer (30s interval) - this.startHeartbeat(); + this.heartbeat.start(); } /** Disable the engine, clearing all timers and watchers */ @@ -201,14 +179,13 @@ export class CueEngine { } this.pendingYamlWatchers.clear(); - // Clear concurrency state - this.eventQueue.clear(); - this.activeRunCount.clear(); - this.manuallyStoppedRuns.clear(); + // Clear concurrency and fan-in state + this.runManager.reset(); + this.fanInTracker.reset(); this.scheduledFiredKeys.clear(); // Stop heartbeat and close database - this.stopHeartbeat(); + this.heartbeat.stop(); try { closeCueDb(); } catch { @@ -262,8 +239,7 @@ export class CueEngine { removeSession(sessionId: string): void { this.teardownSession(sessionId); this.sessions.delete(sessionId); - this.clearQueue(sessionId); - this.activeRunCount.delete(sessionId); + this.runManager.clearQueue(sessionId); const pendingWatcher = this.pendingYamlWatchers.get(sessionId); if (pendingWatcher) { @@ -287,9 +263,7 @@ export class CueEngine { reportedSessionIds.add(sessionId); - const activeRunCount = [...this.activeRuns.values()].filter( - (r) => r.result.sessionId === sessionId - ).length; + const activeRunCount = this.runManager.getActiveRunCount(sessionId); let nextTrigger: string | undefined; if (state.nextTriggers.size > 0) { @@ -338,46 +312,23 @@ export class CueEngine { /** Returns currently running Cue executions */ getActiveRuns(): CueRunResult[] { - return [...this.activeRuns.values()].map((r) => r.result); + return this.runManager.getActiveRuns(); } /** Returns recent completed/failed runs */ getActivityLog(limit?: number): CueRunResult[] { - if (limit !== undefined) { - return this.activityLog.slice(-limit); - } - return [...this.activityLog]; + return this.activityLog.getAll(limit); } /** Stops a specific running execution */ stopRun(runId: string): boolean { - const run = this.activeRuns.get(runId); - if (!run) return false; - - this.manuallyStoppedRuns.add(runId); - this.deps.onStopCueRun?.(runId); - run.abortController?.abort(); - run.result.status = 'stopped'; - run.result.endedAt = new Date().toISOString(); - run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime(); - - this.activeRuns.delete(runId); - this.pushActivityLog(run.result); - this.deps.onLog('cue', `[CUE] Run stopped: ${runId}`, { - type: 'runStopped', - runId, - sessionId: run.result.sessionId, - subscriptionName: run.result.subscriptionName, - }); - return true; + const result = this.runManager.stopRun(runId); + return result; } /** Stops all running executions and clears all queues */ stopAll(): void { - for (const [runId] of this.activeRuns) { - this.stopRun(runId); - } - this.eventQueue.clear(); + this.runManager.stopAll(); } /** Returns master enabled state */ @@ -387,13 +338,7 @@ export class CueEngine { /** Returns queue depth per session (for the Cue Modal) */ getQueueStatus(): Map { - const result = new Map(); - for (const [sessionId, queue] of this.eventQueue) { - if (queue.length > 0) { - result.set(sessionId, queue.length); - } - } - return result; + return this.runManager.getQueueStatus(); } /** Returns the merged Cue settings from the first available session config */ @@ -476,7 +421,7 @@ export class CueEngine { /** Clears queued events for a session */ clearQueue(sessionId: string): void { - this.eventQueue.delete(sessionId); + this.runManager.clearQueue(sessionId); } /** @@ -576,9 +521,9 @@ export class CueEngine { this.dispatchSubscription(ownerSessionId, sub, event, completingName, chainDepth); } else { // Fan-in: track completions with data - this.handleFanIn( + this.fanInTracker.handleCompletion( ownerSessionId, - state, + state.config.settings, sub, sources, sessionId, @@ -592,16 +537,7 @@ export class CueEngine { /** Clear all fan-in state for a session (when Cue is disabled or session removed) */ clearFanInState(sessionId: string): void { - for (const key of [...this.fanInTrackers.keys()]) { - if (key.startsWith(`${sessionId}:`)) { - this.fanInTrackers.delete(key); - const timer = this.fanInTimers.get(key); - if (timer) { - clearTimeout(timer); - this.fanInTimers.delete(key); - } - } - } + this.fanInTracker.clearForSession(sessionId); } // --- Private methods --- @@ -642,7 +578,7 @@ export class CueEngine { fanOutIndex: i, }, }; - this.executeCueRun( + this.runManager.execute( targetSession.id, sub.prompt_file ?? sub.prompt, fanOutEvent, @@ -652,7 +588,7 @@ export class CueEngine { ); } } else { - this.executeCueRun( + this.runManager.execute( ownerSessionId, sub.prompt_file ?? sub.prompt, event, @@ -663,152 +599,6 @@ export class CueEngine { } } - /** - * Handle fan-in logic: track which sources have completed, fire when all done. - * Supports timeout handling based on the subscription's settings. - */ - private handleFanIn( - ownerSessionId: string, - state: SessionState, - sub: CueSubscription, - sources: string[], - completedSessionId: string, - completedSessionName: string, - completionData?: AgentCompletionData - ): void { - const key = `${ownerSessionId}:${sub.name}`; - - if (!this.fanInTrackers.has(key)) { - this.fanInTrackers.set(key, new Map()); - } - const tracker = this.fanInTrackers.get(key)!; - const rawOutput = completionData?.stdout ?? ''; - tracker.set(completedSessionId, { - sessionId: completedSessionId, - sessionName: completedSessionName, - output: rawOutput.slice(-SOURCE_OUTPUT_MAX_CHARS), - truncated: rawOutput.length > SOURCE_OUTPUT_MAX_CHARS, - chainDepth: completionData?.chainDepth ?? 0, - }); - - // Start timeout timer on first source completion - if (tracker.size === 1 && !this.fanInTimers.has(key)) { - const timeoutMs = (state.config.settings.timeout_minutes ?? 30) * 60 * 1000; - const timer = setTimeout(() => { - this.handleFanInTimeout(key, ownerSessionId, state, sub, sources); - }, timeoutMs); - this.fanInTimers.set(key, timer); - } - - const remaining = sources.length - tracker.size; - if (remaining > 0) { - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}": waiting for ${remaining} more session(s)` - ); - return; - } - - // All sources completed — clear timer and fire - const timer = this.fanInTimers.get(key); - if (timer) { - clearTimeout(timer); - this.fanInTimers.delete(key); - } - this.fanInTrackers.delete(key); - - const completions = [...tracker.values()]; - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'agent.completed', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { - completedSessions: completions.map((c) => c.sessionId), - sourceSession: completions.map((c) => c.sessionName).join(', '), - sourceOutput: completions.map((c) => c.output).join('\n---\n'), - outputTruncated: completions.some((c) => c.truncated), - }, - }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); - this.dispatchSubscription( - ownerSessionId, - sub, - event, - completions.map((c) => c.sessionName).join(', '), - maxChainDepth - ); - } - - /** - * Handle fan-in timeout. Behavior depends on timeout_on_fail setting: - * - 'break': log failure and clear the tracker - * - 'continue': fire the downstream subscription with partial data - */ - private handleFanInTimeout( - key: string, - ownerSessionId: string, - state: SessionState, - sub: CueSubscription, - sources: string[] - ): void { - this.fanInTimers.delete(key); - const tracker = this.fanInTrackers.get(key); - if (!tracker) return; - - const completedNames = [...tracker.values()].map((c) => c.sessionName); - const completedIds = [...tracker.keys()]; - - // Determine which sources haven't completed yet - const allSessions = this.deps.getSessions(); - const timedOutSources = sources.filter((src) => { - const session = allSessions.find((s) => s.name === src || s.id === src); - const sessionId = session?.id ?? src; - return !completedIds.includes(sessionId) && !completedIds.includes(src); - }); - - if (state.config.settings.timeout_on_fail === 'continue') { - // Fire with partial data - const completions = [...tracker.values()]; - this.fanInTrackers.delete(key); - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'agent.completed', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { - completedSessions: completions.map((c) => c.sessionId), - timedOutSessions: timedOutSources, - sourceSession: completions.map((c) => c.sessionName).join(', '), - sourceOutput: completions.map((c) => c.output).join('\n---\n'), - outputTruncated: completions.some((c) => c.truncated), - partial: true, - }, - }; - const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)); - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` - ); - this.dispatchSubscription( - ownerSessionId, - sub, - event, - completedNames.join(', '), - maxChainDepth - ); - } else { - // 'break' mode — log failure and clear - this.fanInTrackers.delete(key); - this.deps.onLog( - 'cue', - `[CUE] Fan-in "${sub.name}" timed out (break mode) — ${completedNames.length}/${sources.length} completed, waiting for: ${timedOutSources.join(', ')}` - ); - } - } - private initSession(session: SessionInfo): void { if (!this.enabled) return; @@ -847,21 +637,30 @@ export class CueEngine { } // Set up subscriptions + const setupDeps: SubscriptionSetupDeps = { + enabled: () => this.enabled, + scheduledFiredKeys: this.scheduledFiredKeys, + onLog: this.deps.onLog, + executeCueRun: (sid, prompt, event, subName, outputPrompt) => { + this.runManager.execute(sid, prompt, event, subName, outputPrompt); + }, + }; + for (const sub of config.subscriptions) { if (sub.enabled === false) continue; // Skip subscriptions bound to a different agent if (sub.agent_id && sub.agent_id !== session.id) continue; if (sub.event === 'time.heartbeat' && sub.interval_minutes) { - this.setupHeartbeatSubscription(session, state, sub); + setupHeartbeatSubscription(setupDeps, session, state, sub); } else if (sub.event === 'time.scheduled' && sub.schedule_times?.length) { - this.setupScheduledSubscription(session, state, sub); + setupScheduledSubscription(setupDeps, session, state, sub); } else if (sub.event === 'file.changed' && sub.watch) { - this.setupFileWatcherSubscription(session, state, sub); + setupFileWatcherSubscription(setupDeps, session, state, sub); } else if (sub.event === 'task.pending' && sub.watch) { - this.setupTaskScannerSubscription(session, state, sub); + setupTaskScannerSubscription(setupDeps, session, state, sub); } else if (sub.event === 'github.pull_request' || sub.event === 'github.issue') { - this.setupGitHubPollerSubscription(session, state, sub); + setupGitHubPollerSubscription(setupDeps, session, state, sub); } // agent.completed subscriptions are handled reactively via notifyAgentCompleted } @@ -873,604 +672,8 @@ export class CueEngine { ); } - private setupHeartbeatSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - interval_minutes?: number; - filter?: Record; - } - ): void { - const intervalMs = (sub.interval_minutes ?? 0) * 60 * 1000; - if (intervalMs <= 0) return; - - // Fire immediately on first setup - const immediateEvent: CueEvent = { - id: crypto.randomUUID(), - type: 'time.heartbeat', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { interval_minutes: sub.interval_minutes }, - }; - - // Check payload filter (even for timer events) - if (!sub.filter || matchesFilter(immediateEvent.payload, sub.filter)) { - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat, initial)`); - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - immediateEvent, - sub.name, - sub.output_prompt - ); - } else { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - } - - // Then on the interval - const timer = setInterval(() => { - if (!this.enabled) return; - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'time.heartbeat', - timestamp: new Date().toISOString(), - triggerName: sub.name, - payload: { interval_minutes: sub.interval_minutes }, - }; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat)`); - state.lastTriggered = event.timestamp; - state.nextTriggers.set(sub.name, Date.now() + intervalMs); - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, intervalMs); - - state.nextTriggers.set(sub.name, Date.now() + intervalMs); - state.timers.push(timer); - } - - private setupScheduledSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - schedule_times?: string[]; - schedule_days?: string[]; - filter?: Record; - } - ): void { - const times = sub.schedule_times ?? []; - if (times.length === 0) return; - - const checkAndFire = () => { - if (!this.enabled) return; - - const now = new Date(); - const dayNames = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']; - const currentDay = dayNames[now.getDay()]; - const currentTime = `${String(now.getHours()).padStart(2, '0')}:${String(now.getMinutes()).padStart(2, '0')}`; - - // Check day filter (if specified, current day must match) - if (sub.schedule_days && sub.schedule_days.length > 0) { - if (!sub.schedule_days.includes(currentDay)) { - return; - } - } - - // Check if current time matches any scheduled time - if (!times.includes(currentTime)) { - // Evict stale fired-keys from previous minutes - for (const key of this.scheduledFiredKeys) { - if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { - this.scheduledFiredKeys.delete(key); - } - } - return; - } - - // Guard against double-fire (e.g., config refresh within the same minute) - const firedKey = `${session.id}:${sub.name}:${currentTime}`; - if (this.scheduledFiredKeys.has(firedKey)) { - return; - } - this.scheduledFiredKeys.add(firedKey); - - const event: CueEvent = { - id: crypto.randomUUID(), - type: 'time.scheduled', - timestamp: now.toISOString(), - triggerName: sub.name, - payload: { - schedule_times: sub.schedule_times, - schedule_days: sub.schedule_days, - matched_time: currentTime, - matched_day: currentDay, - }, - }; - - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }; - - // Check every 60 seconds to catch scheduled times - const timer = setInterval(checkAndFire, 60_000); - state.timers.push(timer); - - // Calculate and track the next trigger time - const nextMs = calculateNextScheduledTime(times, sub.schedule_days); - if (nextMs != null) { - state.nextTriggers.set(sub.name, nextMs); - } - } - - private setupFileWatcherSubscription( - session: SessionInfo, - state: SessionState, - sub: { - name: string; - prompt: string; - prompt_file?: string; - output_prompt?: string; - watch?: string; - filter?: Record; - } - ): void { - if (!sub.watch) return; - - const cleanup = createCueFileWatcher({ - watchGlob: sub.watch, - projectRoot: session.projectRoot, - debounceMs: DEFAULT_FILE_DEBOUNCE_MS, - triggerName: sub.name, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (file.changed)`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - private setupGitHubPollerSubscription( - session: SessionInfo, - state: SessionState, - sub: CueSubscription - ): void { - const cleanup = createCueGitHubPoller({ - eventType: sub.event as 'github.pull_request' | 'github.issue', - repo: sub.repo, - pollMinutes: sub.poll_minutes ?? 5, - projectRoot: session.projectRoot, - triggerName: sub.name, - subscriptionId: `${session.id}:${sub.name}`, - ghState: sub.gh_state, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog('cue', `[CUE] "${sub.name}" triggered (${sub.event})`); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - private setupTaskScannerSubscription( - session: SessionInfo, - state: SessionState, - sub: CueSubscription - ): void { - if (!sub.watch) return; - - const cleanup = createCueTaskScanner({ - watchGlob: sub.watch, - pollMinutes: sub.poll_minutes ?? 1, - projectRoot: session.projectRoot, - triggerName: sub.name, - onLog: (level, message) => this.deps.onLog(level as MainLogLevel, message), - onEvent: (event) => { - if (!this.enabled) return; - - // Check payload filter - if (sub.filter && !matchesFilter(event.payload, sub.filter)) { - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})` - ); - return; - } - - this.deps.onLog( - 'cue', - `[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount} task(s) in ${event.payload.filename})` - ); - state.lastTriggered = event.timestamp; - this.executeCueRun( - session.id, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - }); - - state.watchers.push(cleanup); - } - - /** - * Gate for concurrency control. Checks if a slot is available for this session. - * If at limit, queues the event. Otherwise dispatches immediately. - */ - private executeCueRun( - sessionId: string, - prompt: string, - event: CueEvent, - subscriptionName: string, - outputPrompt?: string, - chainDepth?: number - ): void { - // Look up the config for this session to get concurrency settings - const state = this.sessions.get(sessionId); - const maxConcurrent = state?.config.settings.max_concurrent ?? 1; - const queueSize = state?.config.settings.queue_size ?? 10; - const currentCount = this.activeRunCount.get(sessionId) ?? 0; - - if (currentCount >= maxConcurrent) { - // At concurrency limit — queue the event - const sessionName = - this.deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; - if (!this.eventQueue.has(sessionId)) { - this.eventQueue.set(sessionId, []); - } - const queue = this.eventQueue.get(sessionId)!; - - if (queue.length >= queueSize) { - // Drop the oldest entry - queue.shift(); - this.deps.onLog('cue', `[CUE] Queue full for "${sessionName}", dropping oldest event`); - } - - queue.push({ - event, - subscription: { name: subscriptionName, event: event.type, enabled: true, prompt }, - prompt, - outputPrompt, - subscriptionName, - queuedAt: Date.now(), - chainDepth, - }); - - this.deps.onLog( - 'cue', - `[CUE] Event queued for "${sessionName}" (${queue.length}/${queueSize} in queue, ${currentCount}/${maxConcurrent} concurrent)` - ); - return; - } - - // Slot available — dispatch immediately - this.activeRunCount.set(sessionId, currentCount + 1); - this.doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt, chainDepth); - } - - /** - * Actually executes a Cue run. Called when a concurrency slot is available. - * - * If outputPrompt is provided, a second run is executed after the main task - * completes successfully. The output prompt receives the main task's stdout - * as context, and its output replaces the stdout passed downstream. - */ - private async doExecuteCueRun( - sessionId: string, - prompt: string, - event: CueEvent, - subscriptionName: string, - outputPrompt?: string, - chainDepth?: number - ): Promise { - const session = this.deps.getSessions().find((s) => s.id === sessionId); - const state = this.sessions.get(sessionId); - const runId = crypto.randomUUID(); - const abortController = new AbortController(); - - const result: CueRunResult = { - runId, - sessionId, - sessionName: session?.name ?? 'Unknown', - subscriptionName, - event, - status: 'running', - stdout: '', - stderr: '', - exitCode: null, - durationMs: 0, - startedAt: new Date().toISOString(), - endedAt: '', - }; - - this.activeRuns.set(runId, { result, abortController }); - const timeoutMs = (state?.config.settings.timeout_minutes ?? 30) * 60 * 1000; - try { - recordCueEvent({ - id: runId, - type: event.type, - triggerName: event.triggerName, - sessionId, - subscriptionName, - status: 'running', - payload: JSON.stringify(event.payload), - }); - } catch { - // Non-fatal if DB is unavailable - } - this.deps.onLog('cue', `[CUE] Run started: ${subscriptionName}`, { - type: 'runStarted', - runId, - sessionId, - subscriptionName, - }); - - try { - const runResult = await this.deps.onCueRun({ - runId, - sessionId, - prompt, - subscriptionName, - event, - timeoutMs, - }); - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - result.status = runResult.status; - result.stdout = runResult.stdout; - result.stderr = runResult.stderr; - result.exitCode = runResult.exitCode; - - // Execute output prompt if the main task succeeded and an output prompt is configured - if (outputPrompt && result.status === 'completed') { - this.deps.onLog( - 'cue', - `[CUE] "${subscriptionName}" executing output prompt for downstream handoff` - ); - - const outputRunId = crypto.randomUUID(); - const outputEvent: CueEvent = { - ...event, - id: crypto.randomUUID(), - payload: { - ...event.payload, - sourceOutput: result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS), - outputPromptPhase: true, - }, - }; - - try { - recordCueEvent({ - id: outputRunId, - type: event.type, - triggerName: event.triggerName, - sessionId, - subscriptionName: `${subscriptionName}:output`, - status: 'running', - payload: JSON.stringify(outputEvent.payload), - }); - } catch { - // Non-fatal if DB is unavailable - } - - const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`; - const outputResult = await this.deps.onCueRun({ - runId: outputRunId, - sessionId, - prompt: contextPrompt, - subscriptionName: `${subscriptionName}:output`, - event: outputEvent, - timeoutMs, - }); - - try { - updateCueEventStatus(outputRunId, outputResult.status); - } catch { - // Non-fatal if DB is unavailable - } - - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - - if (outputResult.status === 'completed') { - result.stdout = outputResult.stdout; - } else { - this.deps.onLog( - 'cue', - `[CUE] "${subscriptionName}" output prompt failed (${outputResult.status}), using main task output` - ); - } - } - } catch (error) { - if (this.manuallyStoppedRuns.has(runId)) { - return; - } - result.status = 'failed'; - result.stderr = error instanceof Error ? error.message : String(error); - } finally { - result.endedAt = new Date().toISOString(); - result.durationMs = Date.now() - new Date(result.startedAt).getTime(); - this.activeRuns.delete(runId); - - // Decrement active run count and drain queue - const count = this.activeRunCount.get(sessionId) ?? 1; - this.activeRunCount.set(sessionId, Math.max(0, count - 1)); - this.drainQueue(sessionId); - - const wasManuallyStopped = this.manuallyStoppedRuns.has(runId); - if (wasManuallyStopped) { - try { - updateCueEventStatus(runId, 'stopped'); - } catch { - // Non-fatal if DB is unavailable - } - this.manuallyStoppedRuns.delete(runId); - } else { - this.pushActivityLog(result); - try { - updateCueEventStatus(runId, result.status); - } catch { - // Non-fatal if DB is unavailable - } - this.deps.onLog('cue', `[CUE] Run finished: ${subscriptionName} (${result.status})`, { - type: 'runFinished', - runId, - sessionId, - subscriptionName, - status: result.status, - }); - - // Emit completion event for agent completion chains - // This allows downstream subscriptions to react to this Cue run's completion - this.notifyAgentCompleted(sessionId, { - sessionName: result.sessionName, - status: result.status, - exitCode: result.exitCode, - durationMs: result.durationMs, - stdout: result.stdout, - triggeredBy: subscriptionName, - chainDepth: (chainDepth ?? 0) + 1, - }); - } - } - } - - /** - * Drain the event queue for a session, dispatching events while slots are available. - * Drops stale events that have exceeded the timeout. - */ - private drainQueue(sessionId: string): void { - const queue = this.eventQueue.get(sessionId); - if (!queue || queue.length === 0) return; - - const state = this.sessions.get(sessionId); - const maxConcurrent = state?.config.settings.max_concurrent ?? 1; - const timeoutMs = (state?.config.settings.timeout_minutes ?? 30) * 60 * 1000; - const sessionName = this.deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; - - while (queue.length > 0) { - const currentCount = this.activeRunCount.get(sessionId) ?? 0; - if (currentCount >= maxConcurrent) break; - - const entry = queue.shift()!; - const ageMs = Date.now() - entry.queuedAt; - - // Check for stale events - if (ageMs > timeoutMs) { - const ageMinutes = Math.round(ageMs / 60000); - this.deps.onLog( - 'cue', - `[CUE] Dropping stale queued event for "${sessionName}" (queued ${ageMinutes}m ago)` - ); - continue; - } - - // Dispatch the queued event - this.activeRunCount.set(sessionId, currentCount + 1); - this.doExecuteCueRun( - sessionId, - entry.prompt, - entry.event, - entry.subscriptionName, - entry.outputPrompt, - entry.chainDepth - ); - } - - // Clean up empty queue - if (queue.length === 0) { - this.eventQueue.delete(sessionId); - } - } - private pushActivityLog(result: CueRunResult): void { this.activityLog.push(result); - if (this.activityLog.length > ACTIVITY_LOG_MAX) { - this.activityLog = this.activityLog.slice(-ACTIVITY_LOG_MAX); - } } private teardownSession(sessionId: string): void { @@ -1502,82 +705,4 @@ export class CueEngine { } } } - - // --- Heartbeat & Sleep Detection --- - - private startHeartbeat(): void { - this.stopHeartbeat(); - try { - updateHeartbeat(); - } catch { - // Non-fatal if DB not ready - } - this.heartbeatInterval = setInterval(() => { - try { - updateHeartbeat(); - } catch { - // Non-fatal - } - }, HEARTBEAT_INTERVAL_MS); - } - - private stopHeartbeat(): void { - if (this.heartbeatInterval) { - clearInterval(this.heartbeatInterval); - this.heartbeatInterval = null; - } - } - - /** - * Check the last heartbeat to detect if the machine slept. - * If a gap >= SLEEP_THRESHOLD_MS is found, run the reconciler. - */ - private detectSleepAndReconcile(): void { - try { - const lastHeartbeat = getLastHeartbeat(); - if (lastHeartbeat === null) return; // First ever start — nothing to reconcile - - const now = Date.now(); - const gapMs = now - lastHeartbeat; - - if (gapMs < SLEEP_THRESHOLD_MS) return; - - const gapMinutes = Math.round(gapMs / 60_000); - this.deps.onLog( - 'cue', - `[CUE] Sleep detected (gap: ${gapMinutes}m). Reconciling missed events.` - ); - - // Build session info map for the reconciler - const reconcileSessions = new Map(); - const allSessions = this.deps.getSessions(); - for (const [sessionId, state] of this.sessions) { - const session = allSessions.find((s) => s.id === sessionId); - reconcileSessions.set(sessionId, { - config: state.config, - sessionName: session?.name ?? sessionId, - }); - } - - reconcileMissedTimeEvents({ - sleepStartMs: lastHeartbeat, - wakeTimeMs: now, - sessions: reconcileSessions, - onDispatch: (sessionId, sub, event) => { - this.executeCueRun( - sessionId, - sub.prompt_file ?? sub.prompt, - event, - sub.name, - sub.output_prompt - ); - }, - onLog: (level, message) => { - this.deps.onLog(level as MainLogLevel, message); - }, - }); - } catch (error) { - this.deps.onLog('warn', `[CUE] Sleep detection failed: ${error}`); - } - } } diff --git a/src/main/cue/cue-fan-in-tracker.ts b/src/main/cue/cue-fan-in-tracker.ts new file mode 100644 index 000000000..d59ad892a --- /dev/null +++ b/src/main/cue/cue-fan-in-tracker.ts @@ -0,0 +1,214 @@ +/** + * Fan-in completion tracker for the Cue Engine. + * + * Tracks multi-source agent.completed subscriptions: when a subscription + * lists multiple source_sessions, this module accumulates completions + * and fires the downstream subscription when all sources have reported + * (or on timeout, depending on the timeout_on_fail setting). + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { SessionInfo } from '../../shared/types'; +import type { AgentCompletionData, CueEvent, CueSettings, CueSubscription } from './cue-types'; + +export const SOURCE_OUTPUT_MAX_CHARS = 5000; + +/** Stored data for a single fan-in source completion */ +export interface FanInSourceCompletion { + sessionId: string; + sessionName: string; + output: string; + truncated: boolean; + chainDepth: number; +} + +export interface CueFanInDeps { + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + getSessions: () => SessionInfo[]; + dispatchSubscription: ( + ownerSessionId: string, + sub: CueSubscription, + event: CueEvent, + sourceSessionName: string, + chainDepth?: number + ) => void; +} + +export interface CueFanInTracker { + handleCompletion( + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[], + completedSessionId: string, + completedSessionName: string, + completionData?: AgentCompletionData + ): void; + clearForSession(sessionId: string): void; + reset(): void; +} + +export function createCueFanInTracker(deps: CueFanInDeps): CueFanInTracker { + const fanInTrackers = new Map>(); + const fanInTimers = new Map>(); + + function handleFanInTimeout( + key: string, + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[] + ): void { + fanInTimers.delete(key); + const tracker = fanInTrackers.get(key); + if (!tracker) return; + + const completedNames = [...tracker.values()].map((c) => c.sessionName); + const completedIds = [...tracker.keys()]; + + // Determine which sources haven't completed yet + const allSessions = deps.getSessions(); + const timedOutSources = sources.filter((src) => { + const session = allSessions.find((s) => s.name === src || s.id === src); + const sessionId = session?.id ?? src; + return !completedIds.includes(sessionId) && !completedIds.includes(src); + }); + + if (settings.timeout_on_fail === 'continue') { + // Fire with partial data + const completions = [...tracker.values()]; + fanInTrackers.delete(key); + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'agent.completed', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { + completedSessions: completions.map((c) => c.sessionId), + timedOutSessions: timedOutSources, + sourceSession: completions.map((c) => c.sessionName).join(', '), + sourceOutput: completions.map((c) => c.output).join('\n---\n'), + outputTruncated: completions.some((c) => c.truncated), + partial: true, + }, + }; + const maxChainDepth = + completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0; + deps.onLog( + 'cue', + `[CUE] Fan-in "${sub.name}" timed out (continue mode) — firing with ${completedNames.length}/${sources.length} sources` + ); + deps.dispatchSubscription( + ownerSessionId, + sub, + event, + completedNames.join(', '), + maxChainDepth + ); + } else { + // 'break' mode — log failure and clear + fanInTrackers.delete(key); + deps.onLog( + 'cue', + `[CUE] Fan-in "${sub.name}" timed out (break mode) — ${completedNames.length}/${sources.length} completed, waiting for: ${timedOutSources.join(', ')}` + ); + } + } + + return { + handleCompletion( + ownerSessionId: string, + settings: CueSettings, + sub: CueSubscription, + sources: string[], + completedSessionId: string, + completedSessionName: string, + completionData?: AgentCompletionData + ): void { + const key = `${ownerSessionId}:${sub.name}`; + + if (!fanInTrackers.has(key)) { + fanInTrackers.set(key, new Map()); + } + const tracker = fanInTrackers.get(key)!; + const rawOutput = completionData?.stdout ?? ''; + tracker.set(completedSessionId, { + sessionId: completedSessionId, + sessionName: completedSessionName, + output: rawOutput.slice(-SOURCE_OUTPUT_MAX_CHARS), + truncated: rawOutput.length > SOURCE_OUTPUT_MAX_CHARS, + chainDepth: completionData?.chainDepth ?? 0, + }); + + // Start timeout timer on first source completion + if (tracker.size === 1 && !fanInTimers.has(key)) { + const timeoutMs = (settings.timeout_minutes ?? 30) * 60 * 1000; + const timer = setTimeout(() => { + handleFanInTimeout(key, ownerSessionId, settings, sub, sources); + }, timeoutMs); + fanInTimers.set(key, timer); + } + + const remaining = sources.length - tracker.size; + if (remaining > 0) { + deps.onLog('cue', `[CUE] Fan-in "${sub.name}": waiting for ${remaining} more session(s)`); + return; + } + + // All sources completed — clear timer and fire + const timer = fanInTimers.get(key); + if (timer) { + clearTimeout(timer); + fanInTimers.delete(key); + } + fanInTrackers.delete(key); + + const completions = [...tracker.values()]; + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'agent.completed', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { + completedSessions: completions.map((c) => c.sessionId), + sourceSession: completions.map((c) => c.sessionName).join(', '), + sourceOutput: completions.map((c) => c.output).join('\n---\n'), + outputTruncated: completions.some((c) => c.truncated), + }, + }; + const maxChainDepth = + completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0; + deps.onLog('cue', `[CUE] "${sub.name}" triggered (agent.completed, fan-in complete)`); + deps.dispatchSubscription( + ownerSessionId, + sub, + event, + completions.map((c) => c.sessionName).join(', '), + maxChainDepth + ); + }, + + clearForSession(sessionId: string): void { + for (const key of [...fanInTrackers.keys()]) { + if (key.startsWith(`${sessionId}:`)) { + fanInTrackers.delete(key); + const timer = fanInTimers.get(key); + if (timer) { + clearTimeout(timer); + fanInTimers.delete(key); + } + } + } + }, + + reset(): void { + for (const timer of fanInTimers.values()) { + clearTimeout(timer); + } + fanInTrackers.clear(); + fanInTimers.clear(); + }, + }; +} diff --git a/src/main/cue/cue-heartbeat.ts b/src/main/cue/cue-heartbeat.ts new file mode 100644 index 000000000..227eda7ab --- /dev/null +++ b/src/main/cue/cue-heartbeat.ts @@ -0,0 +1,102 @@ +/** + * Heartbeat writer and sleep/wake detection for the Cue Engine. + * + * Writes a heartbeat timestamp to the Cue database every 30 seconds. + * On engine start, checks the gap since the last heartbeat; if the gap + * exceeds 2 minutes, triggers the reconciler for missed time-based events. + */ + +import type { MainLogLevel } from '../../shared/logger-types'; +import { updateHeartbeat, getLastHeartbeat } from './cue-db'; +import { reconcileMissedTimeEvents } from './cue-reconciler'; +import type { ReconcileSessionInfo } from './cue-reconciler'; +import type { CueConfig, CueEvent, CueSubscription } from './cue-types'; + +export const HEARTBEAT_INTERVAL_MS = 30_000; // 30 seconds +export const SLEEP_THRESHOLD_MS = 120_000; // 2 minutes +export const EVENT_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +export interface CueHeartbeatDeps { + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + getSessions: () => Map; + onDispatch: (sessionId: string, sub: CueSubscription, event: CueEvent) => void; +} + +export interface CueHeartbeat { + start(): void; + stop(): void; + /** Run sleep detection and reconciliation (also called on engine start) */ + detectSleepAndReconcile(): void; +} + +export function createCueHeartbeat(deps: CueHeartbeatDeps): CueHeartbeat { + let heartbeatInterval: ReturnType | null = null; + + function startHeartbeat(): void { + stopHeartbeat(); + try { + updateHeartbeat(); + } catch { + // Non-fatal if DB not ready + } + heartbeatInterval = setInterval(() => { + try { + updateHeartbeat(); + } catch { + // Non-fatal + } + }, HEARTBEAT_INTERVAL_MS); + } + + function stopHeartbeat(): void { + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } + } + + function detectSleepAndReconcile(): void { + try { + const lastHeartbeat = getLastHeartbeat(); + if (lastHeartbeat === null) return; // First ever start — nothing to reconcile + + const now = Date.now(); + const gapMs = now - lastHeartbeat; + + if (gapMs < SLEEP_THRESHOLD_MS) return; + + const gapMinutes = Math.round(gapMs / 60_000); + deps.onLog('cue', `[CUE] Sleep detected (gap: ${gapMinutes}m). Reconciling missed events.`); + + // Build session info map for the reconciler + const reconcileSessions = new Map(); + const sessions = deps.getSessions(); + for (const [sessionId, state] of sessions) { + reconcileSessions.set(sessionId, { + config: state.config, + sessionName: state.sessionName, + }); + } + + reconcileMissedTimeEvents({ + sleepStartMs: lastHeartbeat, + wakeTimeMs: now, + sessions: reconcileSessions, + onDispatch: (sessionId, sub, event) => { + deps.onDispatch(sessionId, sub, event); + }, + onLog: (level, message) => { + deps.onLog(level as MainLogLevel, message); + }, + }); + } catch (error) { + deps.onLog('warn', `[CUE] Sleep detection failed: ${error}`); + } + } + + return { + start: startHeartbeat, + stop: stopHeartbeat, + detectSleepAndReconcile, + }; +} diff --git a/src/main/cue/cue-run-manager.ts b/src/main/cue/cue-run-manager.ts new file mode 100644 index 000000000..8f6c9918e --- /dev/null +++ b/src/main/cue/cue-run-manager.ts @@ -0,0 +1,427 @@ +/** + * Cue Run Manager — concurrency control, queue management, and run execution. + * + * Manages the lifecycle of Cue run executions: + * - Concurrency gating (max_concurrent per session) + * - Event queuing when at concurrency limit + * - Queue draining when slots free + * - Active run tracking and stop controls + * - Output prompt execution (two-phase runs) + * - Completion event emission for chain propagation + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { CueEvent, CueRunResult, CueSettings, CueSubscription } from './cue-types'; +import { recordCueEvent, updateCueEventStatus } from './cue-db'; +import { SOURCE_OUTPUT_MAX_CHARS } from './cue-fan-in-tracker'; + +/** Active run tracking */ +export interface ActiveRun { + result: CueRunResult; + abortController?: AbortController; +} + +/** A queued event waiting for a concurrency slot */ +export interface QueuedEvent { + event: CueEvent; + subscription: CueSubscription; + prompt: string; + outputPrompt?: string; + subscriptionName: string; + queuedAt: number; + chainDepth?: number; +} + +export interface CueRunManagerDeps { + getSessions: () => { id: string; name: string }[]; + getSessionSettings: (sessionId: string) => CueSettings | undefined; + onCueRun: (request: { + runId: string; + sessionId: string; + prompt: string; + subscriptionName: string; + event: CueEvent; + timeoutMs: number; + }) => Promise; + onStopCueRun?: (runId: string) => boolean; + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + /** Called when a run finishes naturally (completed/failed/timeout) — pushes to activity log AND triggers chain propagation */ + onRunCompleted: ( + sessionId: string, + result: CueRunResult, + subscriptionName: string, + chainDepth?: number + ) => void; + /** Called when a run is manually stopped — pushes to activity log only (no chain propagation) */ + onRunStopped: (result: CueRunResult) => void; +} + +export interface CueRunManager { + execute( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): void; + stopRun(runId: string): boolean; + stopAll(): void; + getActiveRuns(): CueRunResult[]; + getActiveRunCount(sessionId: string): number; + getActiveRunMap(): Map; + getQueueStatus(): Map; + clearQueue(sessionId: string): void; + reset(): void; +} + +export function createCueRunManager(deps: CueRunManagerDeps): CueRunManager { + const activeRuns = new Map(); + const activeRunCount = new Map(); + const eventQueue = new Map(); + const manuallyStoppedRuns = new Set(); + + function getSessionName(sessionId: string): string { + return deps.getSessions().find((s) => s.id === sessionId)?.name ?? sessionId; + } + + function drainQueue(sessionId: string): void { + const queue = eventQueue.get(sessionId); + if (!queue || queue.length === 0) return; + + const settings = deps.getSessionSettings(sessionId); + const maxConcurrent = settings?.max_concurrent ?? 1; + const timeoutMs = (settings?.timeout_minutes ?? 30) * 60 * 1000; + const sessionName = getSessionName(sessionId); + + while (queue.length > 0) { + const currentCount = activeRunCount.get(sessionId) ?? 0; + if (currentCount >= maxConcurrent) break; + + const entry = queue.shift()!; + const ageMs = Date.now() - entry.queuedAt; + + // Check for stale events + if (ageMs > timeoutMs) { + const ageMinutes = Math.round(ageMs / 60000); + deps.onLog( + 'cue', + `[CUE] Dropping stale queued event for "${sessionName}" (queued ${ageMinutes}m ago)` + ); + continue; + } + + // Dispatch the queued event + activeRunCount.set(sessionId, currentCount + 1); + doExecuteCueRun( + sessionId, + entry.prompt, + entry.event, + entry.subscriptionName, + entry.outputPrompt, + entry.chainDepth + ); + } + + // Clean up empty queue + if (queue.length === 0) { + eventQueue.delete(sessionId); + } + } + + async function doExecuteCueRun( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): Promise { + const sessionName = getSessionName(sessionId); + const settings = deps.getSessionSettings(sessionId); + const runId = crypto.randomUUID(); + const abortController = new AbortController(); + + const result: CueRunResult = { + runId, + sessionId, + sessionName, + subscriptionName, + event, + status: 'running', + stdout: '', + stderr: '', + exitCode: null, + durationMs: 0, + startedAt: new Date().toISOString(), + endedAt: '', + }; + + activeRuns.set(runId, { result, abortController }); + const timeoutMs = (settings?.timeout_minutes ?? 30) * 60 * 1000; + try { + recordCueEvent({ + id: runId, + type: event.type, + triggerName: event.triggerName, + sessionId, + subscriptionName, + status: 'running', + payload: JSON.stringify(event.payload), + }); + } catch { + // Non-fatal if DB is unavailable + } + deps.onLog('cue', `[CUE] Run started: ${subscriptionName}`, { + type: 'runStarted', + runId, + sessionId, + subscriptionName, + }); + + try { + const runResult = await deps.onCueRun({ + runId, + sessionId, + prompt, + subscriptionName, + event, + timeoutMs, + }); + if (manuallyStoppedRuns.has(runId)) { + return; + } + result.status = runResult.status; + result.stdout = runResult.stdout; + result.stderr = runResult.stderr; + result.exitCode = runResult.exitCode; + + // Execute output prompt if the main task succeeded and an output prompt is configured + if (outputPrompt && result.status === 'completed') { + deps.onLog( + 'cue', + `[CUE] "${subscriptionName}" executing output prompt for downstream handoff` + ); + + const outputRunId = crypto.randomUUID(); + const outputEvent: CueEvent = { + ...event, + id: crypto.randomUUID(), + payload: { + ...event.payload, + sourceOutput: result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS), + outputPromptPhase: true, + }, + }; + + try { + recordCueEvent({ + id: outputRunId, + type: event.type, + triggerName: event.triggerName, + sessionId, + subscriptionName: `${subscriptionName}:output`, + status: 'running', + payload: JSON.stringify(outputEvent.payload), + }); + } catch { + // Non-fatal if DB is unavailable + } + + const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`; + const outputResult = await deps.onCueRun({ + runId: outputRunId, + sessionId, + prompt: contextPrompt, + subscriptionName: `${subscriptionName}:output`, + event: outputEvent, + timeoutMs, + }); + + try { + updateCueEventStatus(outputRunId, outputResult.status); + } catch { + // Non-fatal if DB is unavailable + } + + if (manuallyStoppedRuns.has(runId)) { + return; + } + + if (outputResult.status === 'completed') { + result.stdout = outputResult.stdout; + } else { + deps.onLog( + 'cue', + `[CUE] "${subscriptionName}" output prompt failed (${outputResult.status}), using main task output` + ); + } + } + } catch (error) { + if (manuallyStoppedRuns.has(runId)) { + return; + } + result.status = 'failed'; + result.stderr = error instanceof Error ? error.message : String(error); + } finally { + result.endedAt = new Date().toISOString(); + result.durationMs = Date.now() - new Date(result.startedAt).getTime(); + activeRuns.delete(runId); + + const wasManuallyStopped = manuallyStoppedRuns.has(runId); + + // Only decrement here for non-stopped runs — stopRun already decremented eagerly + if (!wasManuallyStopped) { + const count = activeRunCount.get(sessionId) ?? 1; + activeRunCount.set(sessionId, Math.max(0, count - 1)); + drainQueue(sessionId); + } + + if (wasManuallyStopped) { + try { + updateCueEventStatus(runId, 'stopped'); + } catch { + // Non-fatal if DB is unavailable + } + manuallyStoppedRuns.delete(runId); + } else { + try { + updateCueEventStatus(runId, result.status); + } catch { + // Non-fatal if DB is unavailable + } + deps.onLog('cue', `[CUE] Run finished: ${subscriptionName} (${result.status})`, { + type: 'runFinished', + runId, + sessionId, + subscriptionName, + status: result.status, + }); + + // Notify engine of completion (for activity log + chain propagation) + deps.onRunCompleted(sessionId, result, subscriptionName, chainDepth); + } + } + } + + return { + execute( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string, + chainDepth?: number + ): void { + const settings = deps.getSessionSettings(sessionId); + const maxConcurrent = settings?.max_concurrent ?? 1; + const queueSize = settings?.queue_size ?? 10; + const currentCount = activeRunCount.get(sessionId) ?? 0; + + if (currentCount >= maxConcurrent) { + // At concurrency limit — queue the event + const sessionName = getSessionName(sessionId); + if (!eventQueue.has(sessionId)) { + eventQueue.set(sessionId, []); + } + const queue = eventQueue.get(sessionId)!; + + if (queue.length >= queueSize) { + // Drop the oldest entry + queue.shift(); + deps.onLog('cue', `[CUE] Queue full for "${sessionName}", dropping oldest event`); + } + + queue.push({ + event, + subscription: { name: subscriptionName, event: event.type, enabled: true, prompt }, + prompt, + outputPrompt, + subscriptionName, + queuedAt: Date.now(), + chainDepth, + }); + + deps.onLog( + 'cue', + `[CUE] Event queued for "${sessionName}" (${queue.length}/${queueSize} in queue, ${currentCount}/${maxConcurrent} concurrent)` + ); + return; + } + + // Slot available — dispatch immediately + activeRunCount.set(sessionId, currentCount + 1); + doExecuteCueRun(sessionId, prompt, event, subscriptionName, outputPrompt, chainDepth); + }, + + stopRun(runId: string): boolean { + const run = activeRuns.get(runId); + if (!run) return false; + + manuallyStoppedRuns.add(runId); + deps.onStopCueRun?.(runId); + run.abortController?.abort(); + run.result.status = 'stopped'; + run.result.endedAt = new Date().toISOString(); + run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime(); + + activeRuns.delete(runId); + + // Free the concurrency slot immediately so queued events can proceed. + // The finally block in doExecuteCueRun skips its decrement for manually stopped runs. + const count = activeRunCount.get(run.result.sessionId) ?? 1; + activeRunCount.set(run.result.sessionId, Math.max(0, count - 1)); + drainQueue(run.result.sessionId); + + deps.onRunStopped(run.result); + deps.onLog('cue', `[CUE] Run stopped: ${runId}`, { + type: 'runStopped', + runId, + sessionId: run.result.sessionId, + subscriptionName: run.result.subscriptionName, + }); + return true; + }, + + stopAll(): void { + for (const runId of [...activeRuns.keys()]) { + this.stopRun(runId); + } + eventQueue.clear(); + }, + + getActiveRuns(): CueRunResult[] { + return [...activeRuns.values()].map((r) => r.result); + }, + + getActiveRunCount(sessionId: string): number { + return [...activeRuns.values()].filter((r) => r.result.sessionId === sessionId).length; + }, + + getActiveRunMap(): Map { + return activeRuns; + }, + + getQueueStatus(): Map { + const result = new Map(); + for (const [sessionId, queue] of eventQueue) { + if (queue.length > 0) { + result.set(sessionId, queue.length); + } + } + return result; + }, + + clearQueue(sessionId: string): void { + eventQueue.delete(sessionId); + }, + + reset(): void { + activeRuns.clear(); + activeRunCount.clear(); + eventQueue.clear(); + manuallyStoppedRuns.clear(); + }, + }; +} diff --git a/src/main/cue/cue-subscription-setup.ts b/src/main/cue/cue-subscription-setup.ts new file mode 100644 index 000000000..67411bea6 --- /dev/null +++ b/src/main/cue/cue-subscription-setup.ts @@ -0,0 +1,372 @@ +/** + * Subscription setup logic for the Cue Engine. + * + * Sets up event source subscriptions (timers, file watchers, pollers, task scanners) + * for a session's Cue config. Each setup method creates the necessary watchers/timers + * and wires them to the engine's event dispatch pipeline. + */ + +import * as crypto from 'crypto'; +import type { MainLogLevel } from '../../shared/logger-types'; +import type { SessionInfo } from '../../shared/types'; +import type { CueEvent, CueSubscription } from './cue-types'; +import { createCueFileWatcher } from './cue-file-watcher'; +import { createCueGitHubPoller } from './cue-github-poller'; +import { createCueTaskScanner } from './cue-task-scanner'; +import { matchesFilter, describeFilter } from './cue-filter'; + +export const DEFAULT_FILE_DEBOUNCE_MS = 5000; + +const DAY_NAMES = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'] as const; + +/** + * Calculates the next occurrence of a scheduled time. + * Returns a timestamp in ms, or null if inputs are invalid. + */ +export function calculateNextScheduledTime(times: string[], days?: string[]): number | null { + if (times.length === 0) return null; + + const now = new Date(); + const candidates: number[] = []; + + // Check up to 8 days ahead (0..7) to cover same-day-next-week when today's slot has passed + for (let dayOffset = 0; dayOffset <= 7; dayOffset++) { + const candidate = new Date(now); + candidate.setDate(candidate.getDate() + dayOffset); + const dayName = DAY_NAMES[candidate.getDay()]; + + if (days && days.length > 0 && !days.includes(dayName)) continue; + + for (const time of times) { + const [hourStr, minStr] = time.split(':'); + const hour = parseInt(hourStr, 10); + const min = parseInt(minStr, 10); + if (isNaN(hour) || isNaN(min)) continue; + + const target = new Date(candidate); + target.setHours(hour, min, 0, 0); + + if (target.getTime() > now.getTime()) { + candidates.push(target.getTime()); + } + } + } + + return candidates.length > 0 ? Math.min(...candidates) : null; +} + +/** Mutable state passed to subscription setup functions */ +export interface SubscriptionSetupState { + timers: ReturnType[]; + watchers: (() => void)[]; + lastTriggered?: string; + nextTriggers: Map; +} + +/** Dependencies for subscription setup */ +export interface SubscriptionSetupDeps { + enabled: () => boolean; + scheduledFiredKeys: Set; + onLog: (level: MainLogLevel, message: string, data?: unknown) => void; + executeCueRun: ( + sessionId: string, + prompt: string, + event: CueEvent, + subscriptionName: string, + outputPrompt?: string + ) => void; +} + +export function setupHeartbeatSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + interval_minutes?: number; + filter?: Record; + } +): void { + const intervalMs = (sub.interval_minutes ?? 0) * 60 * 1000; + if (intervalMs <= 0) return; + + // Fire immediately on first setup + const immediateEvent: CueEvent = { + id: crypto.randomUUID(), + type: 'time.heartbeat', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { interval_minutes: sub.interval_minutes }, + }; + + // Check payload filter (even for timer events) + if (!sub.filter || matchesFilter(immediateEvent.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat, initial)`); + state.lastTriggered = immediateEvent.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + immediateEvent, + sub.name, + sub.output_prompt + ); + } else { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + } + + // Then on the interval + const timer = setInterval(() => { + if (!deps.enabled()) return; + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'time.heartbeat', + timestamp: new Date().toISOString(), + triggerName: sub.name, + payload: { interval_minutes: sub.interval_minutes }, + }; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.heartbeat)`); + state.lastTriggered = event.timestamp; + state.nextTriggers.set(sub.name, Date.now() + intervalMs); + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, intervalMs); + + state.nextTriggers.set(sub.name, Date.now() + intervalMs); + state.timers.push(timer); +} + +export function setupScheduledSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + schedule_times?: string[]; + schedule_days?: string[]; + filter?: Record; + } +): void { + const times = sub.schedule_times ?? []; + if (times.length === 0) return; + + const checkAndFire = () => { + if (!deps.enabled()) return; + + const now = new Date(); + const currentDay = DAY_NAMES[now.getDay()]; + const currentTime = `${String(now.getHours()).padStart(2, '0')}:${String(now.getMinutes()).padStart(2, '0')}`; + + // Check day filter (if specified, current day must match) + if (sub.schedule_days && sub.schedule_days.length > 0) { + if (!sub.schedule_days.includes(currentDay)) { + return; + } + } + + // Check if current time matches any scheduled time + if (!times.includes(currentTime)) { + // Evict stale fired-keys from previous minutes + for (const key of deps.scheduledFiredKeys) { + if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) { + deps.scheduledFiredKeys.delete(key); + } + } + return; + } + + // Guard against double-fire (e.g., config refresh within the same minute) + const firedKey = `${session.id}:${sub.name}:${currentTime}`; + if (deps.scheduledFiredKeys.has(firedKey)) { + return; + } + deps.scheduledFiredKeys.add(firedKey); + + const event: CueEvent = { + id: crypto.randomUUID(), + type: 'time.scheduled', + timestamp: now.toISOString(), + triggerName: sub.name, + payload: { + schedule_times: sub.schedule_times, + schedule_days: sub.schedule_days, + matched_time: currentTime, + matched_day: currentDay, + }, + }; + + // Refresh next trigger time regardless of filter outcome so the UI stays current + const nextMs = calculateNextScheduledTime(times, sub.schedule_days); + if (nextMs != null) { + state.nextTriggers.set(sub.name, nextMs); + } + + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }; + + // Check every 60 seconds to catch scheduled times + const timer = setInterval(checkAndFire, 60_000); + state.timers.push(timer); + + // Calculate and track the next trigger time + const nextMs = calculateNextScheduledTime(times, sub.schedule_days); + if (nextMs != null) { + state.nextTriggers.set(sub.name, nextMs); + } +} + +export function setupFileWatcherSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: { + name: string; + prompt: string; + prompt_file?: string; + output_prompt?: string; + watch?: string; + filter?: Record; + } +): void { + if (!sub.watch) return; + + const cleanup = createCueFileWatcher({ + watchGlob: sub.watch, + projectRoot: session.projectRoot, + debounceMs: DEFAULT_FILE_DEBOUNCE_MS, + triggerName: sub.name, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (file.changed)`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} + +export function setupGitHubPollerSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: CueSubscription +): void { + const cleanup = createCueGitHubPoller({ + eventType: sub.event as 'github.pull_request' | 'github.issue', + repo: sub.repo, + pollMinutes: sub.poll_minutes ?? 5, + projectRoot: session.projectRoot, + triggerName: sub.name, + subscriptionId: `${session.id}:${sub.name}`, + ghState: sub.gh_state, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog('cue', `[CUE] "${sub.name}" triggered (${sub.event})`); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} + +export function setupTaskScannerSubscription( + deps: SubscriptionSetupDeps, + session: SessionInfo, + state: SubscriptionSetupState, + sub: CueSubscription +): void { + if (!sub.watch) return; + + const cleanup = createCueTaskScanner({ + watchGlob: sub.watch, + pollMinutes: sub.poll_minutes ?? 1, + projectRoot: session.projectRoot, + triggerName: sub.name, + onLog: (level, message) => deps.onLog(level as MainLogLevel, message), + onEvent: (event) => { + if (!deps.enabled()) return; + + // Check payload filter + if (sub.filter && !matchesFilter(event.payload, sub.filter)) { + deps.onLog('cue', `[CUE] "${sub.name}" filter not matched (${describeFilter(sub.filter)})`); + return; + } + + deps.onLog( + 'cue', + `[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount} task(s) in ${event.payload.filename})` + ); + state.lastTriggered = event.timestamp; + deps.executeCueRun( + session.id, + sub.prompt_file ?? sub.prompt, + event, + sub.name, + sub.output_prompt + ); + }, + }); + + state.watchers.push(cleanup); +} diff --git a/src/renderer/components/CueGraphView.tsx b/src/renderer/components/CueGraphView.tsx deleted file mode 100644 index c53aa6ae3..000000000 --- a/src/renderer/components/CueGraphView.tsx +++ /dev/null @@ -1,1166 +0,0 @@ -/** - * CueGraphView — Canvas-based visualization of Maestro Cue subscription graph. - * - * Shows how triggers (events) connect to agents, and how agents chain to other agents - * via agent.completed subscriptions. Follows the same canvas-based rendering approach - * as the Document Graph MindMap for visual consistency. - * - * Features: - * - Trigger nodes (event sources) on the left - * - Agent nodes in the center/right - * - Edges showing subscription connections with labels - * - Pan/zoom with mouse - * - Click-and-drag to reposition individual nodes - * - Layout algorithm dropdown (Hierarchical, Force-Directed) - * - Double-click an agent node to switch focus and close the modal - */ - -import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; -import { - forceSimulation, - forceLink, - forceManyBody, - forceCenter, - forceCollide, - forceX, - forceY, - type SimulationNodeDatum, - type SimulationLinkDatum, -} from 'd3-force'; -import { RefreshCw, Network, ChevronDown } from 'lucide-react'; -import type { Theme } from '../types'; -import { useSessionStore } from '../stores/sessionStore'; - -// ============================================================================ -// Types -// ============================================================================ - -interface CueGraphViewProps { - theme: Theme; - onClose: () => void; -} - -type GraphNodeType = 'agent' | 'trigger'; - -interface GraphNode extends SimulationNodeDatum { - id: string; - type: GraphNodeType; - label: string; - sublabel: string; - sessionId?: string; - toolType?: string; - subscriptionCount?: number; - eventType?: string; - width: number; - height: number; - /** Depth in the dependency graph (0 = trigger, 1+ = agents) */ - depth?: number; -} - -interface GraphEdge extends SimulationLinkDatum { - id: string; - label: string; - sourceId: string; - targetId: string; -} - -type CueLayoutType = 'hierarchical' | 'force'; - -const LAYOUT_LABELS: Record = { - hierarchical: { name: 'Hierarchical', description: 'Left-to-right layers' }, - force: { name: 'Force', description: 'Physics simulation' }, -}; - -// ============================================================================ -// Constants -// ============================================================================ - -const CUE_TEAL = '#06b6d4'; - -const AGENT_NODE_WIDTH = 180; -const AGENT_NODE_HEIGHT = 56; -const TRIGGER_NODE_WIDTH = 160; -const TRIGGER_NODE_HEIGHT = 44; -const NODE_BORDER_RADIUS = 10; - -const EVENT_COLORS: Record = { - 'time.heartbeat': '#f59e0b', - 'time.scheduled': '#8b5cf6', - 'file.changed': '#3b82f6', - 'agent.completed': '#22c55e', - 'github.pull_request': '#a855f7', - 'github.issue': '#f97316', - 'task.pending': CUE_TEAL, -}; - -const EVENT_LABELS: Record = { - 'time.heartbeat': 'Heartbeat', - 'time.scheduled': 'Scheduled', - 'file.changed': 'File Watch', - 'agent.completed': 'Agent Done', - 'github.pull_request': 'GitHub PR', - 'github.issue': 'GitHub Issue', - 'task.pending': 'Task Queue', -}; - -// ============================================================================ -// Graph Data Builder -// ============================================================================ - -interface GraphData { - nodes: GraphNode[]; - edges: GraphEdge[]; -} - -function buildGraphData( - graphSessions: Array<{ - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - source_session?: string | string[]; - fan_out?: string[]; - watch?: string; - interval_minutes?: number; - repo?: string; - poll_minutes?: number; - }>; - }>, - allSessions: Array<{ id: string; name: string; toolType: string }> -): GraphData { - const nodes: GraphNode[] = []; - const edges: GraphEdge[] = []; - const nodeIds = new Set(); - const triggerKeys = new Map(); // composite key → node id - - // Add agent nodes for all Cue-enabled sessions - for (const gs of graphSessions) { - const nodeId = `agent:${gs.sessionId}`; - if (!nodeIds.has(nodeId)) { - nodes.push({ - id: nodeId, - type: 'agent', - label: gs.sessionName, - sublabel: gs.toolType, - sessionId: gs.sessionId, - toolType: gs.toolType, - subscriptionCount: gs.subscriptions.filter((s) => s.enabled !== false).length, - width: AGENT_NODE_WIDTH, - height: AGENT_NODE_HEIGHT, - }); - nodeIds.add(nodeId); - } - } - - // Helper to ensure an agent node exists (for referenced agents not in graph sessions) - function ensureAgentNode(sessionName: string) { - // Try to find by name in graphSessions first - const gs = graphSessions.find((s) => s.sessionName === sessionName); - if (gs) return `agent:${gs.sessionId}`; - - // Try to find in all sessions - const session = allSessions.find((s) => s.name === sessionName); - const nodeId = session ? `agent:${session.id}` : `agent:ref:${sessionName}`; - - if (!nodeIds.has(nodeId)) { - nodes.push({ - id: nodeId, - type: 'agent', - label: sessionName, - sublabel: session?.toolType ?? 'unknown', - sessionId: session?.id, - toolType: session?.toolType, - width: AGENT_NODE_WIDTH, - height: AGENT_NODE_HEIGHT, - }); - nodeIds.add(nodeId); - } - return nodeId; - } - - // Process subscriptions - for (const gs of graphSessions) { - const agentNodeId = `agent:${gs.sessionId}`; - - for (const sub of gs.subscriptions) { - if (sub.enabled === false) continue; - - if (sub.event === 'agent.completed') { - // Agent → Agent connection - const sources = Array.isArray(sub.source_session) - ? sub.source_session - : sub.source_session - ? [sub.source_session] - : []; - - for (const sourceName of sources) { - const sourceNodeId = ensureAgentNode(sourceName); - edges.push({ - id: `edge:${sourceNodeId}→${agentNodeId}:${sub.name}`, - source: sourceNodeId, - target: agentNodeId, - sourceId: sourceNodeId, - targetId: agentNodeId, - label: sub.name, - }); - } - - // Also handle fan_out targets - if (sub.fan_out) { - for (const targetName of sub.fan_out) { - const targetNodeId = ensureAgentNode(targetName); - edges.push({ - id: `edge:${agentNodeId}→${targetNodeId}:${sub.name}:fanout`, - source: agentNodeId, - target: targetNodeId, - sourceId: agentNodeId, - targetId: targetNodeId, - label: `${sub.name} (fan-out)`, - }); - } - } - } else { - // Trigger → Agent connection - const triggerDetail = getTriggerDetail(sub); - const triggerKey = `${sub.event}:${triggerDetail}`; - - let triggerNodeId = triggerKeys.get(triggerKey); - if (!triggerNodeId) { - triggerNodeId = `trigger:${triggerKey}`; - triggerKeys.set(triggerKey, triggerNodeId); - - nodes.push({ - id: triggerNodeId, - type: 'trigger', - label: EVENT_LABELS[sub.event] ?? sub.event, - sublabel: triggerDetail, - eventType: sub.event, - width: TRIGGER_NODE_WIDTH, - height: TRIGGER_NODE_HEIGHT, - }); - nodeIds.add(triggerNodeId); - } - - // Edge from trigger to this agent - edges.push({ - id: `edge:${triggerNodeId}→${agentNodeId}:${sub.name}`, - source: triggerNodeId, - target: agentNodeId, - sourceId: triggerNodeId, - targetId: agentNodeId, - label: sub.name, - }); - - // Handle fan_out for non-agent.completed events - if (sub.fan_out) { - for (const targetName of sub.fan_out) { - const targetNodeId = ensureAgentNode(targetName); - edges.push({ - id: `edge:${triggerNodeId}→${targetNodeId}:${sub.name}:fanout`, - source: triggerNodeId, - target: targetNodeId, - sourceId: triggerNodeId, - targetId: targetNodeId, - label: `${sub.name} (fan-out)`, - }); - } - } - } - } - } - - return { nodes, edges }; -} - -function getTriggerDetail(sub: { - event: string; - watch?: string; - interval_minutes?: number; - repo?: string; - poll_minutes?: number; -}): string { - switch (sub.event) { - case 'time.heartbeat': - return sub.interval_minutes ? `${sub.interval_minutes}m` : 'heartbeat'; - case 'file.changed': - return sub.watch ?? '**/*'; - case 'github.pull_request': - case 'github.issue': - return sub.repo ?? 'repo'; - case 'task.pending': - return sub.watch ?? 'tasks'; - default: - return sub.event; - } -} - -// ============================================================================ -// Layout: Hierarchical (left-to-right layers) -// ============================================================================ - -function layoutHierarchical( - nodes: GraphNode[], - edges: GraphEdge[], - width: number, - height: number -): void { - if (nodes.length === 0) return; - - // Build adjacency for depth calculation (source → targets) - const outgoing = new Map(); - for (const edge of edges) { - const srcId = typeof edge.source === 'string' ? edge.source : (edge.source as GraphNode).id; - const tgtId = typeof edge.target === 'string' ? edge.target : (edge.target as GraphNode).id; - if (!outgoing.has(srcId)) outgoing.set(srcId, []); - outgoing.get(srcId)!.push(tgtId); - } - - // Assign depth via BFS from triggers (depth 0) - const depthMap = new Map(); - const triggers = nodes.filter((n) => n.type === 'trigger'); - const agents = nodes.filter((n) => n.type === 'agent'); - - // All triggers are depth 0 - for (const t of triggers) { - depthMap.set(t.id, 0); - } - - // BFS to assign depths to agents - const queue: string[] = triggers.map((t) => t.id); - while (queue.length > 0) { - const current = queue.shift()!; - const currentDepth = depthMap.get(current) ?? 0; - const targets = outgoing.get(current) ?? []; - for (const targetId of targets) { - const existingDepth = depthMap.get(targetId); - const newDepth = currentDepth + 1; - if (existingDepth === undefined || newDepth > existingDepth) { - depthMap.set(targetId, newDepth); - queue.push(targetId); - } - } - } - - // Agents without edges get depth 1 - for (const a of agents) { - if (!depthMap.has(a.id)) { - depthMap.set(a.id, 1); - } - } - - // Store depth on nodes - for (const node of nodes) { - node.depth = depthMap.get(node.id) ?? 0; - } - - // Group nodes by depth - const layers = new Map(); - for (const node of nodes) { - const d = node.depth!; - if (!layers.has(d)) layers.set(d, []); - layers.get(d)!.push(node); - } - - const sortedDepths = Array.from(layers.keys()).sort((a, b) => a - b); - const numLayers = sortedDepths.length; - if (numLayers === 0) return; - - // Calculate horizontal spacing - const horizontalPadding = 80; - const availableWidth = width - horizontalPadding * 2; - const layerSpacing = numLayers > 1 ? availableWidth / (numLayers - 1) : 0; - - // Position each layer - for (let i = 0; i < sortedDepths.length; i++) { - const depth = sortedDepths[i]; - const layerNodes = layers.get(depth)!; - - // X position for this layer - const layerX = numLayers === 1 ? width / 2 : horizontalPadding + i * layerSpacing; - - // Vertical spacing within layer - const verticalPadding = 40; - const maxNodeHeight = Math.max(...layerNodes.map((n) => n.height)); - const nodeSpacing = maxNodeHeight + 30; - const totalLayerHeight = layerNodes.length * nodeSpacing - 30; - const startY = (height - totalLayerHeight) / 2 + verticalPadding / 2; - - for (let j = 0; j < layerNodes.length; j++) { - layerNodes[j].x = layerX; - layerNodes[j].y = Math.max(verticalPadding, startY + j * nodeSpacing); - } - } -} - -// ============================================================================ -// Layout: Force-Directed (d3-force) -// ============================================================================ - -function layoutForce(nodes: GraphNode[], edges: GraphEdge[], width: number, height: number): void { - if (nodes.length === 0) return; - - // Seed initial positions: triggers left, agents right - for (const node of nodes) { - if (node.type === 'trigger') { - node.x = width * 0.25 + (Math.random() - 0.5) * 100; - node.y = height * 0.5 + (Math.random() - 0.5) * 200; - } else { - node.x = width * 0.7 + (Math.random() - 0.5) * 100; - node.y = height * 0.5 + (Math.random() - 0.5) * 200; - } - } - - const simulation = forceSimulation(nodes) - .force( - 'link', - forceLink(edges) - .id((d) => d.id) - .distance(220) - .strength(0.5) - ) - .force('charge', forceManyBody().strength(-500)) - .force('center', forceCenter(width / 2, height / 2)) - .force( - 'collide', - forceCollide().radius((d) => Math.max(d.width, d.height) * 0.8) - ) - .force( - 'x', - forceX() - .x((d) => (d.type === 'trigger' ? width * 0.25 : width * 0.7)) - .strength(0.15) - ) - .force('y', forceY(height / 2).strength(0.05)) - .stop(); - - // Run simulation synchronously - for (let i = 0; i < 300; i++) { - simulation.tick(); - } -} - -// ============================================================================ -// Layout dispatcher -// ============================================================================ - -function layoutGraph( - layoutType: CueLayoutType, - nodes: GraphNode[], - edges: GraphEdge[], - width: number, - height: number -): void { - if (layoutType === 'hierarchical') { - layoutHierarchical(nodes, edges, width, height); - } else { - layoutForce(nodes, edges, width, height); - } -} - -// ============================================================================ -// Canvas Rendering -// ============================================================================ - -function roundRect( - ctx: CanvasRenderingContext2D, - x: number, - y: number, - w: number, - h: number, - r: number -): void { - ctx.beginPath(); - ctx.moveTo(x + r, y); - ctx.lineTo(x + w - r, y); - ctx.quadraticCurveTo(x + w, y, x + w, y + r); - ctx.lineTo(x + w, y + h - r); - ctx.quadraticCurveTo(x + w, y + h, x + w - r, y + h); - ctx.lineTo(x + r, y + h); - ctx.quadraticCurveTo(x, y + h, x, y + h - r); - ctx.lineTo(x, y + r); - ctx.quadraticCurveTo(x, y, x + r, y); - ctx.closePath(); -} - -function truncateText(ctx: CanvasRenderingContext2D, text: string, maxWidth: number): string { - if (ctx.measureText(text).width <= maxWidth) return text; - let truncated = text; - while (truncated.length > 0 && ctx.measureText(truncated + '...').width > maxWidth) { - truncated = truncated.slice(0, -1); - } - return truncated + '...'; -} - -function drawArrowhead( - ctx: CanvasRenderingContext2D, - toX: number, - toY: number, - angle: number, - size: number, - color: string -): void { - ctx.fillStyle = color; - ctx.beginPath(); - ctx.moveTo(toX, toY); - ctx.lineTo( - toX - size * Math.cos(angle - Math.PI / 6), - toY - size * Math.sin(angle - Math.PI / 6) - ); - ctx.lineTo( - toX - size * Math.cos(angle + Math.PI / 6), - toY - size * Math.sin(angle + Math.PI / 6) - ); - ctx.closePath(); - ctx.fill(); -} - -function renderGraph( - ctx: CanvasRenderingContext2D, - nodes: GraphNode[], - edges: GraphEdge[], - theme: Theme, - transform: { zoom: number; panX: number; panY: number }, - hoveredNodeId: string | null, - selectedNodeId: string | null, - draggingNodeId: string | null, - canvasWidth: number, - canvasHeight: number -): void { - const dpr = window.devicePixelRatio || 1; - - ctx.setTransform(dpr, 0, 0, dpr, 0, 0); - ctx.clearRect(0, 0, canvasWidth, canvasHeight); - - // Apply transform - ctx.save(); - ctx.translate(transform.panX, transform.panY); - ctx.scale(transform.zoom, transform.zoom); - - // Draw edges - for (const edge of edges) { - const source = edge.source as GraphNode; - const target = edge.target as GraphNode; - if (!source.x || !source.y || !target.x || !target.y) continue; - - const sx = source.x + source.width / 2; - const sy = source.y; - const tx = target.x - target.width / 2; - const ty = target.y; - - // Determine edge color based on source type - const edgeColor = - source.type === 'trigger' - ? (EVENT_COLORS[source.eventType ?? ''] ?? theme.colors.textDim) - : '#22c55e'; - - ctx.strokeStyle = edgeColor + '80'; - ctx.lineWidth = 2; - ctx.setLineDash([]); - - // Bezier curve - const dx = Math.abs(tx - sx); - const controlOffset = Math.min(dx * 0.4, 120); - - ctx.beginPath(); - ctx.moveTo(sx, sy); - ctx.bezierCurveTo(sx + controlOffset, sy, tx - controlOffset, ty, tx, ty); - ctx.stroke(); - - // Arrowhead - const angle = Math.atan2(ty - (ty - 0), tx - (tx - controlOffset)); - drawArrowhead(ctx, tx, ty, angle, 8, edgeColor + 'cc'); - - // Edge label - const midX = (sx + tx) / 2; - const midY = (sy + ty) / 2 - 8; - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - const labelText = truncateText(ctx, edge.label, 120); - ctx.fillText(labelText, midX, midY); - } - - // Draw nodes - for (const node of nodes) { - if (node.x === undefined || node.y === undefined) continue; - - const nx = node.x - node.width / 2; - const ny = node.y - node.height / 2; - const isHovered = hoveredNodeId === node.id; - const isSelected = selectedNodeId === node.id; - const isDragging = draggingNodeId === node.id; - - if (node.type === 'trigger') { - // Trigger node - pill shape with event color - const color = EVENT_COLORS[node.eventType ?? ''] ?? CUE_TEAL; - - roundRect(ctx, nx, ny, node.width, node.height, NODE_BORDER_RADIUS); - ctx.fillStyle = color + '18'; - ctx.fill(); - ctx.strokeStyle = isHovered || isSelected || isDragging ? color : color + '60'; - ctx.lineWidth = isHovered || isSelected || isDragging ? 2 : 1; - ctx.stroke(); - - // Event type label - ctx.font = 'bold 11px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = color; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - ctx.fillText(node.label, node.x, node.y - 7); - - // Detail - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - const detailText = truncateText(ctx, node.sublabel, node.width - 20); - ctx.fillText(detailText, node.x, node.y + 8); - } else { - // Agent node - card style - const isAgentCompleted = edges.some( - (e) => (e.source as GraphNode).type === 'agent' && (e.target as GraphNode).id === node.id - ); - const accentColor = isAgentCompleted ? '#22c55e' : CUE_TEAL; - - // Background - roundRect(ctx, nx, ny, node.width, node.height, NODE_BORDER_RADIUS); - ctx.fillStyle = theme.colors.bgActivity; - ctx.fill(); - - // Border - ctx.strokeStyle = isHovered || isSelected || isDragging ? accentColor : theme.colors.border; - ctx.lineWidth = isHovered || isSelected || isDragging ? 2 : 1; - ctx.stroke(); - - // Accent bar at top - ctx.save(); - ctx.beginPath(); - ctx.moveTo(nx + NODE_BORDER_RADIUS, ny); - ctx.lineTo(nx + node.width - NODE_BORDER_RADIUS, ny); - ctx.quadraticCurveTo(nx + node.width, ny, nx + node.width, ny + NODE_BORDER_RADIUS); - ctx.lineTo(nx + node.width, ny + 4); - ctx.lineTo(nx, ny + 4); - ctx.lineTo(nx, ny + NODE_BORDER_RADIUS); - ctx.quadraticCurveTo(nx, ny, nx + NODE_BORDER_RADIUS, ny); - ctx.closePath(); - ctx.fillStyle = accentColor; - ctx.fill(); - ctx.restore(); - - // Agent name - ctx.font = 'bold 12px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textMain; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - const nameText = truncateText(ctx, node.label, node.width - 20); - ctx.fillText(nameText, node.x, node.y - 4); - - // Tool type - ctx.font = '10px -apple-system, BlinkMacSystemFont, sans-serif'; - ctx.fillStyle = theme.colors.textDim; - ctx.fillText(node.sublabel, node.x, node.y + 12); - - // Subscription count badge - if (node.subscriptionCount && node.subscriptionCount > 0) { - const badgeText = `${node.subscriptionCount}`; - ctx.font = 'bold 9px -apple-system, BlinkMacSystemFont, sans-serif'; - const badgeWidth = Math.max(ctx.measureText(badgeText).width + 8, 18); - const badgeX = nx + node.width - badgeWidth - 6; - const badgeY = ny + node.height - 16; - - roundRect(ctx, badgeX, badgeY, badgeWidth, 14, 7); - ctx.fillStyle = accentColor; - ctx.fill(); - - ctx.fillStyle = '#fff'; - ctx.textAlign = 'center'; - ctx.textBaseline = 'middle'; - ctx.fillText(badgeText, badgeX + badgeWidth / 2, badgeY + 7); - } - } - } - - ctx.restore(); -} - -// ============================================================================ -// Hit Testing -// ============================================================================ - -function hitTest( - nodes: GraphNode[], - x: number, - y: number, - transform: { zoom: number; panX: number; panY: number } -): GraphNode | null { - // Convert screen coords to graph coords - const gx = (x - transform.panX) / transform.zoom; - const gy = (y - transform.panY) / transform.zoom; - - // Check nodes in reverse order (top-most first) - for (let i = nodes.length - 1; i >= 0; i--) { - const node = nodes[i]; - if (node.x === undefined || node.y === undefined) continue; - - const nx = node.x - node.width / 2; - const ny = node.y - node.height / 2; - - if (gx >= nx && gx <= nx + node.width && gy >= ny && gy <= ny + node.height) { - return node; - } - } - - return null; -} - -// ============================================================================ -// Component -// ============================================================================ - -export function CueGraphView({ theme, onClose }: CueGraphViewProps) { - const canvasRef = useRef(null); - const containerRef = useRef(null); - const [graphData, setGraphData] = useState(null); - const [loading, setLoading] = useState(true); - const [dimensions, setDimensions] = useState({ width: 800, height: 500 }); - const [hoveredNodeId, setHoveredNodeId] = useState(null); - const [selectedNodeId, setSelectedNodeId] = useState(null); - const [layoutType, setLayoutType] = useState('hierarchical'); - const [showLayoutDropdown, setShowLayoutDropdown] = useState(false); - - const transformRef = useRef({ zoom: 1, panX: 0, panY: 0 }); - const isPanningRef = useRef(false); - const lastMouseRef = useRef({ x: 0, y: 0 }); - const rafRef = useRef(0); - - // Node dragging state - const draggingNodeRef = useRef<{ - nodeId: string; - startX: number; - startY: number; - mouseX: number; - mouseY: number; - } | null>(null); - const [draggingNodeId, setDraggingNodeId] = useState(null); - const nodePositionOverrides = useRef>(new Map()); - - const sessions = useSessionStore((state) => state.sessions); - const setActiveSessionId = useSessionStore((state) => state.setActiveSessionId); - - // Fetch graph data - const fetchGraphData = useCallback(async () => { - setLoading(true); - try { - const data = await window.maestro.cue.getGraphData(); - const allSessionsSimple = sessions.map((s) => ({ - id: s.id, - name: s.name, - toolType: s.toolType, - })); - const graph = buildGraphData(data, allSessionsSimple); - layoutGraph(layoutType, graph.nodes, graph.edges, dimensions.width, dimensions.height); - // Clear position overrides on fresh data - nodePositionOverrides.current.clear(); - setGraphData(graph); - } catch { - setGraphData({ nodes: [], edges: [] }); - } finally { - setLoading(false); - } - }, [sessions, dimensions.width, dimensions.height, layoutType]); - - useEffect(() => { - fetchGraphData(); - }, [fetchGraphData]); - - // Observe container size - useEffect(() => { - const container = containerRef.current; - if (!container) return; - - const observer = new ResizeObserver((entries) => { - for (const entry of entries) { - const { width, height } = entry.contentRect; - if (width > 0 && height > 0) { - setDimensions({ width, height }); - } - } - }); - - observer.observe(container); - return () => observer.disconnect(); - }, []); - - // Re-layout when dimensions or layout type change - useEffect(() => { - if (graphData && graphData.nodes.length > 0) { - layoutGraph( - layoutType, - graphData.nodes, - graphData.edges, - dimensions.width, - dimensions.height - ); - // Apply position overrides - for (const node of graphData.nodes) { - const override = nodePositionOverrides.current.get(node.id); - if (override) { - node.x = override.x; - node.y = override.y; - } - } - // Center the transform - transformRef.current = { zoom: 1, panX: 0, panY: 0 }; - requestDraw(); - } - }, [dimensions.width, dimensions.height, graphData, layoutType]); - - // Canvas setup and rendering - const requestDraw = useCallback(() => { - if (rafRef.current) cancelAnimationFrame(rafRef.current); - rafRef.current = requestAnimationFrame(() => { - const canvas = canvasRef.current; - if (!canvas || !graphData) return; - - const ctx = canvas.getContext('2d'); - if (!ctx) return; - - renderGraph( - ctx, - graphData.nodes, - graphData.edges, - theme, - transformRef.current, - hoveredNodeId, - selectedNodeId, - draggingNodeId, - dimensions.width, - dimensions.height - ); - }); - }, [graphData, theme, hoveredNodeId, selectedNodeId, draggingNodeId, dimensions]); - - useEffect(() => { - requestDraw(); - }, [requestDraw]); - - // Set canvas size with DPR - useEffect(() => { - const canvas = canvasRef.current; - if (!canvas) return; - const dpr = window.devicePixelRatio || 1; - canvas.width = dimensions.width * dpr; - canvas.height = dimensions.height * dpr; - canvas.style.width = `${dimensions.width}px`; - canvas.style.height = `${dimensions.height}px`; - requestDraw(); - }, [dimensions, requestDraw]); - - // Close layout dropdown on click outside - useEffect(() => { - if (!showLayoutDropdown) return; - const handleClick = () => setShowLayoutDropdown(false); - document.addEventListener('click', handleClick); - return () => document.removeEventListener('click', handleClick); - }, [showLayoutDropdown]); - - // Mouse handlers - const handleMouseDown = useCallback( - (e: React.MouseEvent) => { - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - - const node = hitTest(graphData.nodes, x, y, transformRef.current); - if (node) { - setSelectedNodeId(node.id); - // Start node drag - draggingNodeRef.current = { - nodeId: node.id, - startX: node.x!, - startY: node.y!, - mouseX: e.clientX, - mouseY: e.clientY, - }; - setDraggingNodeId(node.id); - } else { - setSelectedNodeId(null); - isPanningRef.current = true; - lastMouseRef.current = { x: e.clientX, y: e.clientY }; - } - }, - [graphData] - ); - - const handleMouseMove = useCallback( - (e: React.MouseEvent) => { - if (draggingNodeRef.current && graphData) { - // Dragging a node - const { nodeId, startX, startY, mouseX, mouseY } = draggingNodeRef.current; - const zoom = transformRef.current.zoom; - const deltaX = (e.clientX - mouseX) / zoom; - const deltaY = (e.clientY - mouseY) / zoom; - const newX = startX + deltaX; - const newY = startY + deltaY; - - // Update node position - const node = graphData.nodes.find((n) => n.id === nodeId); - if (node) { - node.x = newX; - node.y = newY; - nodePositionOverrides.current.set(nodeId, { x: newX, y: newY }); - } - - if (canvasRef.current) { - canvasRef.current.style.cursor = 'grabbing'; - } - requestDraw(); - return; - } - - if (isPanningRef.current) { - const dx = e.clientX - lastMouseRef.current.x; - const dy = e.clientY - lastMouseRef.current.y; - transformRef.current.panX += dx; - transformRef.current.panY += dy; - lastMouseRef.current = { x: e.clientX, y: e.clientY }; - requestDraw(); - return; - } - - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - const node = hitTest(graphData.nodes, x, y, transformRef.current); - const newHoveredId = node?.id ?? null; - if (newHoveredId !== hoveredNodeId) { - setHoveredNodeId(newHoveredId); - } - - // Cursor style - if (canvasRef.current) { - canvasRef.current.style.cursor = node ? 'grab' : 'default'; - } - }, - [graphData, hoveredNodeId, requestDraw] - ); - - const handleMouseUp = useCallback(() => { - draggingNodeRef.current = null; - setDraggingNodeId(null); - isPanningRef.current = false; - if (canvasRef.current) { - canvasRef.current.style.cursor = hoveredNodeId ? 'grab' : 'default'; - } - }, [hoveredNodeId]); - - const handleDoubleClick = useCallback( - (e: React.MouseEvent) => { - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect || !graphData) return; - - const x = e.clientX - rect.left; - const y = e.clientY - rect.top; - const node = hitTest(graphData.nodes, x, y, transformRef.current); - - if (node?.type === 'agent' && node.sessionId) { - setActiveSessionId(node.sessionId); - onClose(); - } - }, - [graphData, setActiveSessionId, onClose] - ); - - const handleWheel = useCallback( - (e: React.WheelEvent) => { - e.preventDefault(); - const rect = canvasRef.current?.getBoundingClientRect(); - if (!rect) return; - - const mouseX = e.clientX - rect.left; - const mouseY = e.clientY - rect.top; - - const zoomFactor = e.deltaY < 0 ? 1.08 : 1 / 1.08; - const newZoom = Math.max(0.2, Math.min(3, transformRef.current.zoom * zoomFactor)); - - // Zoom toward mouse position - const scale = newZoom / transformRef.current.zoom; - transformRef.current.panX = mouseX - scale * (mouseX - transformRef.current.panX); - transformRef.current.panY = mouseY - scale * (mouseY - transformRef.current.panY); - transformRef.current.zoom = newZoom; - - requestDraw(); - }, - [requestDraw] - ); - - // Handle layout type change - const handleLayoutTypeChange = useCallback((type: CueLayoutType) => { - // Clear position overrides when changing layout - nodePositionOverrides.current.clear(); - setLayoutType(type); - setShowLayoutDropdown(false); - }, []); - - // Selected node info - const selectedNode = useMemo( - () => graphData?.nodes.find((n) => n.id === selectedNodeId) ?? null, - [graphData, selectedNodeId] - ); - - if (loading) { - return ( -
- Loading Cue graph... -
- ); - } - - if (!graphData || graphData.nodes.length === 0) { - return ( -
- - No Cue subscriptions found. Create .maestro/cue.yaml in a project to see the graph. - -
- ); - } - - return ( -
- {/* Toolbar */} -
-
- - {graphData.nodes.filter((n) => n.type === 'agent').length} agents - · - {graphData.nodes.filter((n) => n.type === 'trigger').length} triggers - · - {graphData.edges.length} connections - - - {/* Layout dropdown */} -
- - - {showLayoutDropdown && ( -
- {(['hierarchical', 'force'] as CueLayoutType[]).map((type) => ( - - ))} -
- )} -
-
-
- {selectedNode?.type === 'agent' && selectedNode.sessionId && ( - - )} - - - Double-click agent to switch focus - -
-
- - {/* Canvas */} -
- -
-
- ); -} diff --git a/src/renderer/components/CueModal.tsx b/src/renderer/components/CueModal.tsx index 6be1ebd38..7a8b6d6de 100644 --- a/src/renderer/components/CueModal.tsx +++ b/src/renderer/components/CueModal.tsx @@ -24,40 +24,15 @@ import { MODAL_PRIORITIES } from '../constants/modalPriorities'; import { useCue } from '../hooks/useCue'; import type { CueSessionStatus, CueRunResult } from '../hooks/useCue'; import { CueHelpContent } from './CueHelpModal'; -// Kept for reference - visual pipeline editor replaces this -// import { CueGraphView } from './CueGraphView'; import { CuePipelineEditor } from './CuePipelineEditor'; import { useSessionStore } from '../stores/sessionStore'; import { getModalActions } from '../stores/modalStore'; -import type { CuePipeline } from '../../shared/cue-pipeline-types'; +import type { CuePipeline, CueGraphSession } from '../../shared/cue-pipeline-types'; import { getPipelineColorForAgent } from './CuePipelineEditor/pipelineColors'; import { graphSessionsToPipelines } from './CuePipelineEditor/utils/yamlToPipeline'; type CueModalTab = 'dashboard' | 'pipeline'; -interface CueGraphSession { - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - prompt?: string; - output_prompt?: string; - interval_minutes?: number; - schedule_times?: string[]; - schedule_days?: string[]; - watch?: string; - source_session?: string | string[]; - fan_out?: string[]; - filter?: Record; - repo?: string; - poll_minutes?: number; - agent_id?: string; - }>; -} - interface CueModalProps { theme: Theme; onClose: () => void; @@ -1029,7 +1004,6 @@ export function CueModal({ theme, onClose, cueShortcutKeys }: CueModalProps) { , document.body )} - {showHelp && } ); } diff --git a/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx b/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx index bfc49986d..9a2564dc4 100644 --- a/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx +++ b/src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx @@ -26,6 +26,7 @@ import type { Theme } from '../../types'; import type { CuePipelineState, CuePipeline, + CueGraphSession, PipelineNode, PipelineEdge as PipelineEdgeType, PipelineLayoutState, @@ -54,30 +55,6 @@ import { CueSettingsPanel } from './panels/CueSettingsPanel'; import type { CueSettings } from '../../../main/cue/cue-types'; import { DEFAULT_CUE_SETTINGS } from '../../../main/cue/cue-types'; -interface CueGraphSession { - sessionId: string; - sessionName: string; - toolType: string; - subscriptions: Array<{ - name: string; - event: string; - enabled: boolean; - prompt?: string; - output_prompt?: string; - interval_minutes?: number; - schedule_times?: string[]; - schedule_days?: string[]; - watch?: string; - source_session?: string | string[]; - fan_out?: string[]; - filter?: Record; - repo?: string; - poll_minutes?: number; - agent_id?: string; - label?: string; - }>; -} - interface SessionInfo { id: string; groupId?: string; diff --git a/src/shared/cue-pipeline-types.ts b/src/shared/cue-pipeline-types.ts index 8000f040d..33c35cada 100644 --- a/src/shared/cue-pipeline-types.ts +++ b/src/shared/cue-pipeline-types.ts @@ -114,6 +114,34 @@ export interface PipelineLayoutState { viewport?: PipelineViewport; } +/** Session data with subscriptions for the Cue graph/pipeline visualization (renderer-safe mirror of cue-types.ts CueGraphSession) */ +export interface CueGraphSession { + sessionId: string; + sessionName: string; + toolType: string; + subscriptions: Array<{ + name: string; + event: CueEventType; + enabled: boolean; + prompt?: string; + prompt_file?: string; + output_prompt?: string; + output_prompt_file?: string; + interval_minutes?: number; + schedule_times?: string[]; + schedule_days?: string[]; + watch?: string; + source_session?: string | string[]; + fan_out?: string[]; + filter?: Record; + repo?: string; + poll_minutes?: number; + gh_state?: string; + agent_id?: string; + label?: string; + }>; +} + /** Returns the first unused color from the palette, cycling if all used. */ export function getNextPipelineColor(existingPipelines: CuePipeline[]): string { const usedColors = new Set(existingPipelines.map((p) => p.color));