diff --git a/packages/cli/src/cli/commands/dispatch.ts b/packages/cli/src/cli/commands/dispatch.ts index ce0f756..350052f 100644 --- a/packages/cli/src/cli/commands/dispatch.ts +++ b/packages/cli/src/cli/commands/dispatch.ts @@ -179,6 +179,50 @@ export function registerDispatchCommands(program: Command, defaultActor: string) ), ); + addWorkspaceOption( + dispatchCmd + .command('retry ') + .description('Retry a failed run by creating a new run attempt') + .option('-a, --actor ', 'Actor', defaultActor) + .option('--adapter ', 'Adapter override for retry') + .option('--objective ', 'Objective override for retry') + .option('--no-execute', 'Create retry run but do not execute immediately') + .option('--agents ', 'Comma-separated agent identities') + .option('--max-steps ', 'Maximum scheduler steps', '200') + .option('--step-delay-ms ', 'Delay between scheduling steps', '25') + .option('--space ', 'Restrict execution to one space') + .option('--timeout-ms ', 'Execution timeout in milliseconds') + .option('--dispatch-mode ', 'direct|self-assembly') + .option('--self-assembly-agent ', 'Agent identity for self-assembly dispatch mode') + .option('--json', 'Emit structured JSON output'), + ).action((runId, opts) => + runCommand( + opts, + async () => { + const workspacePath = resolveWorkspacePath(opts); + return { + run: await workgraph.dispatch.retryRun(workspacePath, runId, { + actor: opts.actor, + adapter: opts.adapter, + objective: opts.objective, + execute: opts.execute, + agents: csv(opts.agents), + maxSteps: Number.parseInt(String(opts.maxSteps), 10), + stepDelayMs: Number.parseInt(String(opts.stepDelayMs), 10), + space: opts.space, + timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined, + dispatchMode: opts.dispatchMode, + selfAssemblyAgent: opts.selfAssemblyAgent, + }), + }; + }, + (result) => [ + `Retried run: ${result.run.id} [${result.run.status}]`, + ...(result.run.context?.retry_of_run_id ? [`Source run: ${String(result.run.context.retry_of_run_id)}`] : []), + ], + ), + ); + addWorkspaceOption( dispatchCmd .command('followup ') diff --git a/packages/cli/src/cli/commands/trigger.ts b/packages/cli/src/cli/commands/trigger.ts index b1acf99..f95a58b 100644 --- a/packages/cli/src/cli/commands/trigger.ts +++ b/packages/cli/src/cli/commands/trigger.ts @@ -18,19 +18,53 @@ export function registerTriggerCommands(program: Command, defaultActor: string): .option('-a, --actor ', 'Actor', defaultActor) .option('--event-key ', 'Deterministic event key for idempotency') .option('--objective ', 'Override run objective') + .option('--adapter ', 'Adapter override for dispatched run') + .option('--execute', 'Execute the triggered run immediately') + .option('--retry-failed', 'Retry failed run when idempotency resolves to failed status') + .option('--agents ', 'Comma-separated agent identities for execution') + .option('--max-steps ', 'Maximum scheduler steps for execution') + .option('--step-delay-ms ', 'Delay between scheduling steps for execution') + .option('--space ', 'Restrict execution to one space') + .option('--timeout-ms ', 'Execution timeout in milliseconds') + .option('--dispatch-mode ', 'direct|self-assembly') + .option('--self-assembly-agent ', 'Agent identity for self-assembly dispatch mode') .option('--json', 'Emit structured JSON output'), ).action((triggerPath, opts) => runCommand( opts, - () => { + async () => { const workspacePath = resolveWorkspacePath(opts); + if (opts.execute) { + return workgraph.trigger.fireTriggerAndExecute(workspacePath, triggerPath, { + actor: opts.actor, + eventKey: opts.eventKey, + objective: opts.objective, + adapter: opts.adapter, + retryFailed: Boolean(opts.retryFailed), + executeInput: { + agents: opts.agents ? String(opts.agents).split(',').map((entry: string) => entry.trim()).filter(Boolean) : undefined, + maxSteps: opts.maxSteps ? Number.parseInt(String(opts.maxSteps), 10) : undefined, + stepDelayMs: opts.stepDelayMs ? Number.parseInt(String(opts.stepDelayMs), 10) : undefined, + space: opts.space, + timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined, + dispatchMode: opts.dispatchMode, + selfAssemblyAgent: opts.selfAssemblyAgent, + }, + }); + } return workgraph.trigger.fireTrigger(workspacePath, triggerPath, { actor: opts.actor, eventKey: opts.eventKey, objective: opts.objective, + adapter: opts.adapter, }); }, (result) => [ + ...(() => { + const executedResult = result as { executed?: boolean; retriedFromRunId?: string }; + if (!executedResult.executed) return []; + return [`Executed: yes${executedResult.retriedFromRunId ? ` (retried from ${executedResult.retriedFromRunId})` : ''}`]; + })(), `Fired trigger: ${result.triggerPath}`, `Run: ${result.run.id} [${result.run.status}]`, ], @@ -46,24 +80,64 @@ export function registerTriggerCommands(program: Command, defaultActor: string): .command('run') .description('Process one trigger-engine cycle') .option('-a, --actor ', 'Actor', defaultActor) + .option('--execute-runs', 'Execute dispatch-run actions as full run->evidence loop') + .option('--retry-failed-runs', 'Retry failed runs when dispatch-run hits failed idempotent runs') + .option('--agents ', 'Comma-separated agent identities for execution') + .option('--max-steps ', 'Maximum scheduler steps for execution') + .option('--step-delay-ms ', 'Delay between scheduling steps for execution') + .option('--space ', 'Restrict execution to one space') + .option('--timeout-ms ', 'Execution timeout in milliseconds') + .option('--dispatch-mode ', 'direct|self-assembly') + .option('--self-assembly-agent ', 'Agent identity for self-assembly dispatch mode') .option('--json', 'Emit structured JSON output'), ).action((opts) => runCommand( opts, - () => { + async () => { const workspacePath = resolveWorkspacePath(opts); + if (opts.executeRuns) { + return workgraph.triggerEngine.runTriggerRunEvidenceLoop(workspacePath, { + actor: opts.actor, + retryFailedRuns: Boolean(opts.retryFailedRuns), + execution: { + agents: opts.agents ? String(opts.agents).split(',').map((entry: string) => entry.trim()).filter(Boolean) : undefined, + maxSteps: opts.maxSteps ? Number.parseInt(String(opts.maxSteps), 10) : undefined, + stepDelayMs: opts.stepDelayMs ? Number.parseInt(String(opts.stepDelayMs), 10) : undefined, + space: opts.space, + timeoutMs: opts.timeoutMs ? Number.parseInt(String(opts.timeoutMs), 10) : undefined, + dispatchMode: opts.dispatchMode, + selfAssemblyAgent: opts.selfAssemblyAgent, + }, + }); + } return workgraph.triggerEngine.runTriggerEngineCycle(workspacePath, { actor: opts.actor, }); }, - (result) => [ - `Evaluated: ${result.evaluated} triggers`, - `Fired: ${result.fired}`, - `Errors: ${result.errors}`, - ...result.triggers.map((t) => - ` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`, - ), - ], + (result) => { + if ('cycle' in result) { + return [ + `Evaluated: ${result.cycle.evaluated} triggers`, + `Fired: ${result.cycle.fired}`, + `Errors: ${result.cycle.errors}`, + `Executed runs: ${result.executedRuns.length} (succeeded=${result.succeeded}, failed=${result.failed}, cancelled=${result.cancelled}, skipped=${result.skipped})`, + ...result.cycle.triggers.map((t) => + ` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`, + ), + ...result.executedRuns.map((run) => + ` run ${run.runId}: ${run.status}${run.retriedFromRunId ? ` (retried from ${run.retriedFromRunId})` : ''}${run.error ? ` error: ${run.error}` : ''}`, + ), + ]; + } + return [ + `Evaluated: ${result.evaluated} triggers`, + `Fired: ${result.fired}`, + `Errors: ${result.errors}`, + ...result.triggers.map((t) => + ` ${t.triggerPath}: ${t.fired ? 'FIRED' : 'skipped'} (${t.reason})${t.error ? ` error: ${t.error}` : ''}`, + ), + ]; + }, ), ); } diff --git a/packages/kernel/src/__snapshots__/schema-drift-regression.test.ts.snap b/packages/kernel/src/__snapshots__/schema-drift-regression.test.ts.snap index 2591a3c..22dd691 100644 --- a/packages/kernel/src/__snapshots__/schema-drift-regression.test.ts.snap +++ b/packages/kernel/src/__snapshots__/schema-drift-regression.test.ts.snap @@ -110,6 +110,16 @@ exports[`schema drift regression > locks CLI option signatures for critical comm "-a, --actor ", "--event-key ", "--objective ", + "--adapter ", + "--execute", + "--retry-failed", + "--agents ", + "--max-steps ", + "--step-delay-ms ", + "--space ", + "--timeout-ms ", + "--dispatch-mode ", + "--self-assembly-agent ", "--json", "-w, --workspace ", "--vault ", diff --git a/packages/kernel/src/dispatch-evidence-loop.test.ts b/packages/kernel/src/dispatch-evidence-loop.test.ts new file mode 100644 index 0000000..08b3fe4 --- /dev/null +++ b/packages/kernel/src/dispatch-evidence-loop.test.ts @@ -0,0 +1,159 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { spawnSync } from 'node:child_process'; +import { loadRegistry, saveRegistry } from './registry.js'; +import { + auditTrail, + createRun, + executeRun, + listRunEvidence, + retryRun, +} from './dispatch.js'; +import { registerDispatchAdapter } from './runtime-adapter-registry.js'; +import type { DispatchAdapter, DispatchAdapterExecutionInput, DispatchAdapterExecutionResult } from './runtime-adapter-contracts.js'; + +let workspacePath: string; +let gitAvailable = false; + +beforeEach(() => { + workspacePath = fs.mkdtempSync(path.join(os.tmpdir(), 'wg-dispatch-evidence-')); + const registry = loadRegistry(workspacePath); + saveRegistry(workspacePath, registry); + const gitInit = spawnSync('git', ['init'], { + cwd: workspacePath, + stdio: 'ignore', + }); + gitAvailable = (gitInit.status ?? 1) === 0; +}); + +afterEach(() => { + fs.rmSync(workspacePath, { recursive: true, force: true }); +}); + +describe('dispatch run evidence loop', () => { + it('captures immutable audit trail and execution evidence', async () => { + const command = `"${process.execPath}" -e "const fs=require('fs'); fs.mkdirSync('artifacts',{recursive:true}); fs.writeFileSync('artifacts/evidence.txt','ok'); console.log('tests: 3 passed, 0 failed'); console.log('proof artifacts/evidence.txt'); console.log('https://github.com/versatly/workgraph/pull/4242');"`; + const run = createRun(workspacePath, { + actor: 'agent-evidence', + adapter: 'shell-worker', + objective: 'Collect execution evidence', + context: { + shell_command: command, + }, + }); + + const executed = await executeRun(workspacePath, run.id, { + actor: 'agent-evidence', + timeoutMs: 10_000, + }); + + expect(executed.status).toBe('succeeded'); + expect((executed.evidenceChain?.count ?? 0) > 0).toBe(true); + expect((executed.audit?.eventCount ?? 0) > 0).toBe(true); + + const evidence = listRunEvidence(workspacePath, run.id); + const evidenceTypes = new Set(evidence.map((entry) => entry.type)); + expect(evidenceTypes.has('stdout')).toBe(true); + expect(evidenceTypes.has('test-result')).toBe(true); + expect(evidenceTypes.has('pr-url')).toBe(true); + expect(evidenceTypes.has('attachment')).toBe(true); + if (gitAvailable) { + expect(evidenceTypes.has('file-change')).toBe(true); + } + + const trail = auditTrail(workspacePath, run.id); + expect(trail.some((entry) => entry.kind === 'run-created')).toBe(true); + expect(trail.some((entry) => entry.kind === 'run-execution-started')).toBe(true); + expect(trail.some((entry) => entry.kind === 'run-evidence-collected')).toBe(true); + expect(trail.some((entry) => entry.kind === 'run-execution-finished')).toBe(true); + }); + + it('fails gracefully on execution timeout and records timeout audit event', async () => { + registerDispatchAdapter('test-timeout-adapter', () => + makeAdapter(async () => + new Promise(() => { + // Intentional never-resolving execution promise to trigger dispatcher timeout. + })), + ); + + const run = createRun(workspacePath, { + actor: 'agent-timeout', + adapter: 'test-timeout-adapter', + objective: 'Trigger timeout path', + }); + + const finished = await executeRun(workspacePath, run.id, { + actor: 'agent-timeout', + timeoutMs: 25, + }); + + expect(finished.status).toBe('failed'); + expect(finished.error).toContain('timed out'); + const trail = auditTrail(workspacePath, run.id); + expect(trail.some((entry) => entry.kind === 'run-execution-timeout')).toBe(true); + }); + + it('retries failed runs into a new attempt', async () => { + registerDispatchAdapter('test-retry-adapter', () => + makeAdapter(async (input) => { + if (input.context?.retry_attempt) { + return { + status: 'succeeded', + output: 'retry succeeded', + logs: [], + }; + } + return { + status: 'failed', + error: 'first attempt failed', + logs: [], + }; + }), + ); + + const source = createRun(workspacePath, { + actor: 'agent-retry', + adapter: 'test-retry-adapter', + objective: 'Retry target', + }); + const failed = await executeRun(workspacePath, source.id, { actor: 'agent-retry' }); + expect(failed.status).toBe('failed'); + + const retried = await retryRun(workspacePath, source.id, { + actor: 'agent-retry', + }); + expect(retried.id).not.toBe(source.id); + expect(retried.status).toBe('succeeded'); + expect(retried.context?.retry_of_run_id).toBe(source.id); + expect(retried.context?.retry_attempt).toBe(1); + + const sourceTrail = auditTrail(workspacePath, source.id); + expect(sourceTrail.some((entry) => entry.kind === 'run-retried')).toBe(true); + }); +}); + +function makeAdapter( + executeImpl: (input: DispatchAdapterExecutionInput) => Promise, +): DispatchAdapter { + return { + name: 'test-adapter', + async create() { + return { runId: 'external-run', status: 'queued' }; + }, + async status(runId: string) { + return { runId, status: 'running' }; + }, + async followup(runId: string) { + return { runId, status: 'running' }; + }, + async stop(runId: string) { + return { runId, status: 'cancelled' }; + }, + async logs() { + return []; + }, + execute: executeImpl, + }; +} diff --git a/packages/kernel/src/dispatch-run-audit.ts b/packages/kernel/src/dispatch-run-audit.ts new file mode 100644 index 0000000..f977a7e --- /dev/null +++ b/packages/kernel/src/dispatch-run-audit.ts @@ -0,0 +1,82 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import { createHash, randomUUID } from 'node:crypto'; +import type { DispatchRunAuditEvent, DispatchRunAuditEventKind } from './types.js'; + +const RUN_AUDIT_FILE = '.workgraph/dispatch-run-audit.jsonl'; + +export interface AppendDispatchRunAuditEventInput { + runId: string; + actor: string; + kind: DispatchRunAuditEventKind; + data?: Record; + ts?: string; +} + +export function appendDispatchRunAuditEvent( + workspacePath: string, + input: AppendDispatchRunAuditEventInput, +): DispatchRunAuditEvent { + const now = input.ts ?? new Date().toISOString(); + const existing = listDispatchRunAuditEvents(workspacePath, input.runId); + const last = existing[existing.length - 1]; + const event: Omit = { + id: `runevt_${randomUUID()}`, + runId: input.runId, + seq: (last?.seq ?? 0) + 1, + ts: now, + actor: input.actor, + kind: input.kind, + data: input.data ?? {}, + prevHash: last?.hash, + }; + const hash = hashAuditEvent(event); + const fullEvent: DispatchRunAuditEvent = { + ...event, + hash, + }; + appendAuditLine(workspacePath, fullEvent); + return fullEvent; +} + +export function listDispatchRunAuditEvents( + workspacePath: string, + runId?: string, +): DispatchRunAuditEvent[] { + const filePath = runAuditPath(workspacePath); + if (!fs.existsSync(filePath)) return []; + const lines = fs.readFileSync(filePath, 'utf-8') + .split('\n') + .map((entry) => entry.trim()) + .filter(Boolean); + const parsed: DispatchRunAuditEvent[] = []; + for (const line of lines) { + try { + const event = JSON.parse(line) as DispatchRunAuditEvent; + if (runId && event.runId !== runId) continue; + parsed.push(event); + } catch { + continue; + } + } + return parsed; +} + +export function runAuditPath(workspacePath: string): string { + return path.join(workspacePath, RUN_AUDIT_FILE); +} + +function appendAuditLine(workspacePath: string, event: DispatchRunAuditEvent): void { + const filePath = runAuditPath(workspacePath); + const directory = path.dirname(filePath); + if (!fs.existsSync(directory)) { + fs.mkdirSync(directory, { recursive: true }); + } + fs.appendFileSync(filePath, `${JSON.stringify(event)}\n`, 'utf-8'); +} + +function hashAuditEvent(event: Omit): string { + return createHash('sha256') + .update(JSON.stringify(event)) + .digest('hex'); +} diff --git a/packages/kernel/src/dispatch-run-evidence.ts b/packages/kernel/src/dispatch-run-evidence.ts new file mode 100644 index 0000000..3cc8e46 --- /dev/null +++ b/packages/kernel/src/dispatch-run-evidence.ts @@ -0,0 +1,194 @@ +import { spawnSync } from 'node:child_process'; +import { randomUUID } from 'node:crypto'; +import { collectThreadEvidence } from './evidence.js'; +import type { DispatchAdapterExecutionResult } from './runtime-adapter-contracts.js'; +import type { DispatchRunEvidenceItem } from './types.js'; + +const PR_URL_PATTERN = /\bhttps?:\/\/github\.com\/[^/\s]+\/[^/\s]+\/pull\/\d+\b/gi; +const MAX_EVIDENCE_TEXT_CHARS = 3_000; +const MAX_TEST_SIGNALS = 20; + +export interface DispatchExecutionEvidenceInput { + runId: string; + execution: DispatchAdapterExecutionResult; + beforeGitState: Set | null; + afterGitState: Set | null; +} + +export interface DispatchExecutionEvidenceResult { + items: DispatchRunEvidenceItem[]; + summary: { + count: number; + byType: Record; + lastCollectedAt: string; + }; +} + +export function captureWorkspaceGitState(workspacePath: string): Set | null { + const result = spawnSync('git', ['status', '--porcelain', '--untracked-files=all'], { + cwd: workspacePath, + encoding: 'utf-8', + }); + if ((result.status ?? 1) !== 0) return null; + const files = new Set(); + for (const rawLine of result.stdout.split('\n')) { + const line = rawLine.trimEnd(); + if (!line) continue; + const payload = line.slice(3).trim(); + if (!payload) continue; + if (payload.includes(' -> ')) { + const [, target] = payload.split(' -> '); + if (target) files.add(target.trim()); + continue; + } + files.add(payload); + } + return files; +} + +export function collectDispatchExecutionEvidence( + input: DispatchExecutionEvidenceInput, +): DispatchExecutionEvidenceResult { + const now = new Date().toISOString(); + const evidence: DispatchRunEvidenceItem[] = []; + const output = readOptionalText(input.execution.output); + const error = readOptionalText(input.execution.error); + const logLines = (input.execution.logs ?? []).map((entry) => `[${entry.level}] ${entry.message}`).join('\n'); + const corpus = [output, error, logLines].filter(Boolean).join('\n'); + + if (output) { + evidence.push(createEvidence(input.runId, now, 'stdout', 'adapter-output', clampText(extractStdout(output)))); + } + if (error) { + evidence.push(createEvidence(input.runId, now, 'stderr', 'adapter-error', clampText(error))); + } + + const prUrls = dedupeStrings(corpus.match(PR_URL_PATTERN) ?? []); + for (const url of prUrls) { + evidence.push(createEvidence(input.runId, now, 'pr-url', 'derived', url)); + } + + const testSignals = extractTestSignals(corpus); + for (const signal of testSignals) { + evidence.push(createEvidence(input.runId, now, 'test-result', 'derived', signal)); + } + + const inferred = collectThreadEvidence(corpus); + for (const item of inferred) { + if (item.type === 'url') { + evidence.push(createEvidence(input.runId, now, 'url', 'derived', item.value)); + } else if (item.type === 'attachment') { + evidence.push(createEvidence(input.runId, now, 'attachment', 'derived', item.value)); + } else if (item.type === 'thread-ref') { + evidence.push(createEvidence(input.runId, now, 'thread-ref', 'derived', item.value)); + } else { + evidence.push(createEvidence(input.runId, now, 'reply-ref', 'derived', item.value)); + } + } + + const changedFiles = diffGitStates(input.beforeGitState, input.afterGitState); + for (const file of changedFiles) { + evidence.push(createEvidence(input.runId, now, 'file-change', 'git', file)); + } + + if (input.execution.metrics && Object.keys(input.execution.metrics).length > 0) { + evidence.push(createEvidence( + input.runId, + now, + 'metric', + 'adapter-metric', + clampText(JSON.stringify(input.execution.metrics)), + )); + } + + const deduped = dedupeEvidence(evidence); + return { + items: deduped, + summary: { + count: deduped.length, + byType: buildTypeCounts(deduped), + lastCollectedAt: now, + }, + }; +} + +function createEvidence( + runId: string, + ts: string, + type: DispatchRunEvidenceItem['type'], + source: DispatchRunEvidenceItem['source'], + value: string, + metadata?: Record, +): DispatchRunEvidenceItem { + return { + id: `runev_${randomUUID()}`, + runId, + ts, + type, + source, + value, + ...(metadata ? { metadata } : {}), + }; +} + +function readOptionalText(value: unknown): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function extractStdout(output: string): string { + const match = output.match(/STDOUT:\n([\s\S]*?)\n\nSTDERR:/); + if (!match?.[1]) return output; + return match[1].trim(); +} + +function clampText(value: string): string { + if (value.length <= MAX_EVIDENCE_TEXT_CHARS) return value; + return `${value.slice(0, MAX_EVIDENCE_TEXT_CHARS)}\n...[truncated]`; +} + +function extractTestSignals(text: string): string[] { + if (!text) return []; + const signals = text + .split('\n') + .map((line) => line.trim()) + .filter((line) => + line.length > 0 + && /test|spec|vitest|jest|pass|fail|coverage/i.test(line) + && /(pass|fail|skip|todo|coverage)/i.test(line), + ); + return dedupeStrings(signals).slice(0, MAX_TEST_SIGNALS); +} + +function diffGitStates(before: Set | null, after: Set | null): string[] { + if (!before || !after) return []; + const diff: string[] = []; + for (const entry of after) { + if (!before.has(entry)) diff.push(entry); + } + return diff.sort((a, b) => a.localeCompare(b)); +} + +function dedupeEvidence(items: DispatchRunEvidenceItem[]): DispatchRunEvidenceItem[] { + const deduped = new Map(); + for (const item of items) { + const key = `${item.type}:${item.source}:${item.value}`; + if (!deduped.has(key)) { + deduped.set(key, item); + } + } + return [...deduped.values()]; +} + +function dedupeStrings(items: string[]): string[] { + return [...new Set(items.map((entry) => entry.trim()).filter(Boolean))]; +} + +function buildTypeCounts(items: DispatchRunEvidenceItem[]): Record { + const counts: Record = {}; + for (const item of items) { + counts[item.type] = (counts[item.type] ?? 0) + 1; + } + return counts; +} diff --git a/packages/kernel/src/dispatch.test.ts b/packages/kernel/src/dispatch.test.ts index 1f294d5..853d9bd 100644 --- a/packages/kernel/src/dispatch.test.ts +++ b/packages/kernel/src/dispatch.test.ts @@ -245,9 +245,9 @@ describe('dispatch core module', () => { adapter: 'test-invalid-terminal', objective: 'Bad terminal status', }); - await expect( - executeRun(workspacePath, invalid.id, { actor: 'agent-adapter' }), - ).rejects.toThrow('invalid terminal status "running"'); + const invalidResult = await executeRun(workspacePath, invalid.id, { actor: 'agent-adapter' }); + expect(invalidResult.status).toBe('failed'); + expect(invalidResult.error).toContain('invalid terminal status "running"'); }); }); diff --git a/packages/kernel/src/dispatch.ts b/packages/kernel/src/dispatch.ts index 335f133..8b08a44 100644 --- a/packages/kernel/src/dispatch.ts +++ b/packages/kernel/src/dispatch.ts @@ -10,12 +10,27 @@ import * as ledger from './ledger.js'; import * as store from './store.js'; import * as thread from './thread.js'; import * as gate from './gate.js'; +import { + appendDispatchRunAuditEvent, + listDispatchRunAuditEvents, +} from './dispatch-run-audit.js'; +import { + captureWorkspaceGitState, + collectDispatchExecutionEvidence, +} from './dispatch-run-evidence.js'; import { resolveDispatchAdapter } from './runtime-adapter-registry.js'; import type { DispatchAdapterLogEntry } from './runtime-adapter-contracts.js'; -import type { DispatchRun, PrimitiveInstance, RunStatus } from './types.js'; +import type { + DispatchRun, + DispatchRunAuditEvent, + DispatchRunEvidenceItem, + PrimitiveInstance, + RunStatus, +} from './types.js'; const RUNS_FILE = '.workgraph/dispatch-runs.json'; const DEFAULT_LEASE_MINUTES = 30; +const DEFAULT_EXECUTE_TIMEOUT_MS = 10 * 60_000; export interface DispatchCreateInput { actor: string; @@ -32,6 +47,10 @@ export interface DispatchExecuteInput { stepDelayMs?: number; space?: string; createCheckpoint?: boolean; + timeoutMs?: number; + dispatchMode?: 'direct' | 'self-assembly'; + selfAssemblyAgent?: string; + selfAssemblyOptions?: Record; } export interface DispatchHeartbeatInput { @@ -62,6 +81,23 @@ export interface DispatchClaimResult { gateCheck: gate.ThreadGateCheckResult; } +export interface DispatchRetryInput { + actor: string; + adapter?: string; + objective?: string; + contextPatch?: Record; + execute?: boolean; + agents?: string[]; + maxSteps?: number; + stepDelayMs?: number; + space?: string; + createCheckpoint?: boolean; + timeoutMs?: number; + dispatchMode?: 'direct' | 'self-assembly'; + selfAssemblyAgent?: string; + selfAssemblyOptions?: Record; +} + export function createRun(workspacePath: string, input: DispatchCreateInput): DispatchRun { assertDispatchMutationAuthorized(workspacePath, input.actor, 'dispatch.run.create', '.workgraph/dispatch-runs', [ 'dispatch:run', @@ -69,7 +105,17 @@ export function createRun(workspacePath: string, input: DispatchCreateInput): Di const state = loadRuns(workspacePath); if (input.idempotencyKey) { const existing = state.runs.find((run) => run.idempotencyKey === input.idempotencyKey); - if (existing) return existing; + if (existing) { + appendDispatchRunAuditEvent(workspacePath, { + runId: existing.id, + actor: input.actor, + kind: 'run-idempotency-hit', + data: { + idempotency_key: input.idempotencyKey, + }, + }); + return hydrateRunWithRuntimeMetadata(workspacePath, existing); + } } const now = new Date().toISOString(); @@ -93,6 +139,17 @@ export function createRun(workspacePath: string, input: DispatchCreateInput): Di state.runs.push(run); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor: input.actor, + kind: 'run-created', + data: { + adapter: run.adapter, + objective: run.objective, + status: run.status, + idempotency_key: run.idempotencyKey, + }, + }); ledger.append(workspacePath, input.actor, 'create', `.workgraph/runs/${run.id}`, 'run', { adapter: run.adapter, objective: run.objective, @@ -100,7 +157,7 @@ export function createRun(workspacePath: string, input: DispatchCreateInput): Di }); ensureRunPrimitive(workspacePath, run, input.actor); - return run; + return hydrateRunWithRuntimeMetadata(workspacePath, run); } export function claimThread(workspacePath: string, threadRef: string, actor: string): DispatchClaimResult { @@ -123,7 +180,7 @@ export function claimThread(workspacePath: string, threadRef: string, actor: str export function status(workspacePath: string, runId: string): DispatchRun { const run = getRun(workspacePath, runId); if (!run) throw new Error(`Run not found: ${runId}`); - return run; + return hydrateRunWithRuntimeMetadata(workspacePath, run); } export function followup(workspacePath: string, runId: string, actor: string, input: string): DispatchRun { @@ -145,12 +202,21 @@ export function followup(workspacePath: string, runId: string, actor: string, in message: `Follow-up from ${actor}: ${input}`, }); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor, + kind: 'run-followup', + data: { + input, + status: run.status, + }, + }); ledger.append(workspacePath, actor, 'update', `.workgraph/runs/${run.id}`, 'run', { followup: true, status: run.status, }); syncRunPrimitive(workspacePath, run, actor); - return run; + return hydrateRunWithRuntimeMetadata(workspacePath, run); } export function stop(workspacePath: string, runId: string, actor: string): DispatchRun { @@ -184,9 +250,20 @@ export function markRun( target.context = run.context; target.updatedAt = new Date().toISOString(); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: target.id, + actor, + kind: 'run-marked', + data: { + status: target.status, + has_output: Boolean(target.output), + has_error: Boolean(target.error), + context_keys: Object.keys(target.context ?? {}), + }, + }); syncRunPrimitive(workspacePath, target, actor); } - return target ?? run; + return hydrateRunWithRuntimeMetadata(workspacePath, target ?? run); } export function heartbeat( @@ -215,12 +292,22 @@ export function heartbeat( }); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor: input.actor, + kind: 'run-heartbeat', + data: { + lease_expires: run.leaseExpires, + lease_duration_minutes: run.leaseDurationMinutes, + heartbeat_count: run.heartbeats?.length ?? 0, + }, + }); ledger.append(workspacePath, input.actor, 'update', `.workgraph/runs/${run.id}`, 'run', { heartbeat: true, lease_expires: run.leaseExpires, }); syncRunPrimitive(workspacePath, run, input.actor); - return run; + return hydrateRunWithRuntimeMetadata(workspacePath, run); } export function reconcileExpiredLeases( @@ -256,6 +343,16 @@ export function reconcileExpiredLeases( if (requeuedRuns.length > 0) { saveRuns(workspacePath, state); for (const run of requeuedRuns) { + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor, + kind: 'run-status-changed', + data: { + from_status: 'running', + to_status: 'queued', + reason: 'lease-expired', + }, + }); ledger.append(workspacePath, actor, 'update', `.workgraph/runs/${run.id}`, 'run', { status: run.status, reconciled_expired_lease: true, @@ -312,6 +409,26 @@ export function handoffRun( to_actor: input.to, reason: input.reason, }); + appendDispatchRunAuditEvent(workspacePath, { + runId: sourceRun.id, + actor: input.actor, + kind: 'run-handoff', + data: { + to_run_id: created.id, + to_actor: input.to, + reason: input.reason, + }, + }); + appendDispatchRunAuditEvent(workspacePath, { + runId: created.id, + actor: input.actor, + kind: 'run-handoff', + data: { + from_run_id: sourceRun.id, + from_actor: sourceRun.actor, + reason: input.reason, + }, + }); return { sourceRun: status(workspacePath, sourceRun.id), @@ -323,9 +440,29 @@ export function logs(workspacePath: string, runId: string): DispatchRun['logs'] return status(workspacePath, runId).logs; } +export function auditTrail(workspacePath: string, runId: string): DispatchRunAuditEvent[] { + const run = status(workspacePath, runId); + return listDispatchRunAuditEvents(workspacePath, run.id); +} + +export function listRunEvidence(workspacePath: string, runId: string): DispatchRunEvidenceItem[] { + const trail = auditTrail(workspacePath, runId); + const evidence: DispatchRunEvidenceItem[] = []; + for (const entry of trail) { + if (entry.kind !== 'run-evidence-collected') continue; + const items = Array.isArray(entry.data.items) ? entry.data.items : []; + for (const item of items) { + if (!item || typeof item !== 'object') continue; + evidence.push(item as DispatchRunEvidenceItem); + } + } + return evidence; +} + export function listRuns(workspacePath: string, options: { status?: RunStatus; limit?: number } = {}): DispatchRun[] { const runs = loadRuns(workspacePath).runs .filter((run) => (options.status ? run.status === options.status : true)) + .map((run) => hydrateRunWithRuntimeMetadata(workspacePath, run)) .sort((a, b) => b.createdAt.localeCompare(a.createdAt)); if (options.limit && options.limit > 0) { return runs.slice(0, options.limit); @@ -351,38 +488,145 @@ export async function executeRun( throw new Error(`Dispatch adapter "${existing.adapter}" does not implement execute().`); } - if (existing.status === 'queued') { - setStatus(workspacePath, runId, input.actor, 'running', `Run started on adapter "${existing.adapter}".`); - } + const resolvedDispatchMode = input.dispatchMode + ?? normalizeDispatchMode(existing.context?.dispatch_mode) + ?? 'direct'; + const resolvedTimeoutMs = normalizeExecutionTimeoutMs( + input.timeoutMs ?? readOptionalNumber(existing.context?.run_timeout_ms), + ); + const beforeGitState = captureWorkspaceGitState(workspacePath); - const execution = await adapter.execute({ - workspacePath, + appendDispatchRunAuditEvent(workspacePath, { runId, actor: input.actor, - objective: existing.objective, - context: existing.context, - agents: input.agents, - maxSteps: input.maxSteps, - stepDelayMs: input.stepDelayMs, - space: input.space, - createCheckpoint: input.createCheckpoint, - isCancelled: () => status(workspacePath, runId).status === 'cancelled', + kind: 'run-execution-started', + data: { + adapter: existing.adapter, + dispatch_mode: resolvedDispatchMode, + timeout_ms: resolvedTimeoutMs, + }, }); - appendRunLogs(workspacePath, runId, input.actor, execution.logs); + if (resolvedDispatchMode === 'self-assembly') { + const selfAssembly = await attemptSelfAssembly(workspacePath, existing, input); + appendRunLogs(workspacePath, runId, input.actor, selfAssembly.logs); + if (!selfAssembly.ok) { + appendDispatchRunAuditEvent(workspacePath, { + runId, + actor: input.actor, + kind: 'run-execution-error', + data: { + dispatch_mode: resolvedDispatchMode, + error: selfAssembly.error, + stage: 'self-assembly', + }, + }); + return markRun(workspacePath, runId, input.actor, 'failed', { + error: selfAssembly.error, + contextPatch: { + dispatch_mode: resolvedDispatchMode, + self_assembly_failed: true, + }, + }); + } + } - const finalStatus = execution.status; - if (finalStatus === 'queued' || finalStatus === 'running') { - throw new Error(`Adapter returned invalid terminal status "${finalStatus}" for execute().`); + if (existing.status === 'queued') { + setStatus(workspacePath, runId, input.actor, 'running', `Run started on adapter "${existing.adapter}".`); } - return markRun(workspacePath, runId, input.actor, finalStatus, { - output: execution.output, - error: execution.error, - contextPatch: execution.metrics - ? { adapter_metrics: execution.metrics } - : undefined, - }); + try { + const execution = await withExecutionTimeout( + adapter.execute({ + workspacePath, + runId, + actor: input.actor, + objective: existing.objective, + context: existing.context, + agents: input.agents, + maxSteps: input.maxSteps, + stepDelayMs: input.stepDelayMs, + space: input.space, + createCheckpoint: input.createCheckpoint, + isCancelled: () => status(workspacePath, runId).status === 'cancelled', + }), + resolvedTimeoutMs, + runId, + ); + + appendRunLogs(workspacePath, runId, input.actor, execution.logs); + const finalStatus = execution.status; + if (finalStatus === 'queued' || finalStatus === 'running') { + throw new Error(`Adapter returned invalid terminal status "${finalStatus}" for execute().`); + } + const afterGitState = captureWorkspaceGitState(workspacePath); + const evidence = collectDispatchExecutionEvidence({ + runId, + execution, + beforeGitState, + afterGitState, + }); + appendDispatchRunAuditEvent(workspacePath, { + runId, + actor: input.actor, + kind: 'run-evidence-collected', + data: { + items: evidence.items, + summary: evidence.summary, + }, + }); + appendDispatchRunAuditEvent(workspacePath, { + runId, + actor: input.actor, + kind: 'run-execution-finished', + data: { + status: finalStatus, + evidence_count: evidence.summary.count, + }, + }); + + return markRun(workspacePath, runId, input.actor, finalStatus, { + output: execution.output, + error: execution.error, + contextPatch: { + ...(execution.metrics ? { adapter_metrics: execution.metrics } : {}), + dispatch_mode: resolvedDispatchMode, + evidence_chain: evidence.summary, + }, + }); + } catch (error) { + const message = errorMessage(error); + const statusValue = status(workspacePath, runId); + if (statusValue.status === 'cancelled') { + appendDispatchRunAuditEvent(workspacePath, { + runId, + actor: input.actor, + kind: 'run-execution-finished', + data: { + status: 'cancelled', + reason: 'execution cancelled', + }, + }); + return statusValue; + } + const kind = message.includes('timed out') + ? 'run-execution-timeout' + : 'run-execution-error'; + appendDispatchRunAuditEvent(workspacePath, { + runId, + actor: input.actor, + kind, + data: { + error: message, + }, + }); + return markRun(workspacePath, runId, input.actor, 'failed', { + error: message, + contextPatch: { + dispatch_mode: resolvedDispatchMode, + }, + }); + } } export async function createAndExecuteRun( @@ -397,6 +641,59 @@ export async function createAndExecuteRun( }); } +export async function retryRun( + workspacePath: string, + runId: string, + input: DispatchRetryInput, +): Promise { + assertDispatchMutationAuthorized(workspacePath, input.actor, 'dispatch.run.retry', runId, [ + 'dispatch:run', + ]); + const source = status(workspacePath, runId); + if (source.status !== 'failed') { + throw new Error(`Run ${runId} is in status "${source.status}". Only failed runs can be retried.`); + } + const priorAttempt = readOptionalNumber(source.context?.retry_attempt) ?? 0; + const retryAttempt = Math.trunc(priorAttempt) + 1; + const retried = createRun(workspacePath, { + actor: input.actor, + adapter: input.adapter ?? source.adapter, + objective: input.objective ?? source.objective, + context: { + ...(source.context ?? {}), + ...(input.contextPatch ?? {}), + retry_of_run_id: source.id, + retry_attempt: retryAttempt, + retry_requested_by: input.actor, + retry_requested_at: new Date().toISOString(), + }, + }); + appendDispatchRunAuditEvent(workspacePath, { + runId: source.id, + actor: input.actor, + kind: 'run-retried', + data: { + retried_run_id: retried.id, + retry_attempt: retryAttempt, + }, + }); + if (input.execute === false) { + return retried; + } + return executeRun(workspacePath, retried.id, { + actor: input.actor, + agents: input.agents, + maxSteps: input.maxSteps, + stepDelayMs: input.stepDelayMs, + space: input.space, + createCheckpoint: input.createCheckpoint, + timeoutMs: input.timeoutMs, + dispatchMode: input.dispatchMode, + selfAssemblyAgent: input.selfAssemblyAgent, + selfAssemblyOptions: input.selfAssemblyOptions, + }); +} + function appendRunLogs( workspacePath: string, runId: string, @@ -413,6 +710,15 @@ function appendRunLogs( run.logs.push(...logEntries); run.updatedAt = new Date().toISOString(); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor, + kind: 'run-logs-appended', + data: { + count: logEntries.length, + levels: [...new Set(logEntries.map((entry) => entry.level))], + }, + }); ledger.append(workspacePath, actor, 'update', `.workgraph/runs/${run.id}`, 'run', { log_append_count: logEntries.length, }); @@ -432,6 +738,7 @@ function setStatus( const state = loadRuns(workspacePath); const run = state.runs.find((entry) => entry.id === runId); if (!run) throw new Error(`Run not found: ${runId}`); + const previousStatus = run.status; assertRunStatusTransition(run.status, statusValue, runId); const now = new Date().toISOString(); run.status = statusValue; @@ -443,11 +750,21 @@ function setStatus( run.updatedAt = now; run.logs.push({ ts: now, level: 'info', message: logMessage }); saveRuns(workspacePath, state); + appendDispatchRunAuditEvent(workspacePath, { + runId: run.id, + actor, + kind: 'run-status-changed', + data: { + from_status: previousStatus, + to_status: statusValue, + lease_expires: run.leaseExpires, + }, + }); ledger.append(workspacePath, actor, 'update', `.workgraph/runs/${run.id}`, 'run', { status: run.status, }); syncRunPrimitive(workspacePath, run, actor); - return run; + return hydrateRunWithRuntimeMetadata(workspacePath, run); } function runsPath(workspacePath: string): string { @@ -512,20 +829,21 @@ function syncRunPrimitive(workspacePath: string, run: DispatchRun, actor: string const runs = store.list(workspacePath, 'run'); const existing = runs.find((entry) => String(entry.fields.run_id) === run.id); if (!existing) return; + const hydrated = hydrateRunWithRuntimeMetadata(workspacePath, run); store.update( workspacePath, existing.path, { - status: run.status, - runtime: run.adapter, - objective: run.objective, - owner: run.actor, - lease_expires: run.leaseExpires, - lease_duration_minutes: run.leaseDurationMinutes, - last_heartbeat: latestHeartbeat(run), - heartbeat_timestamps: run.heartbeats ?? [], + status: hydrated.status, + runtime: hydrated.adapter, + objective: hydrated.objective, + owner: hydrated.actor, + lease_expires: hydrated.leaseExpires, + lease_duration_minutes: hydrated.leaseDurationMinutes, + last_heartbeat: latestHeartbeat(hydrated), + heartbeat_timestamps: hydrated.heartbeats ?? [], }, - renderRunBody(run), + renderRunBody(hydrated), actor, ); } @@ -569,6 +887,23 @@ function renderRunBody(run: DispatchRun): string { lines.push(run.error); lines.push(''); } + if (run.audit?.eventCount || run.evidenceChain?.count) { + lines.push('## Evidence & Audit'); + lines.push(''); + lines.push(`audit_events: ${run.audit?.eventCount ?? 0}`); + lines.push(`audit_head_hash: ${run.audit?.headHash ?? 'none'}`); + lines.push(`evidence_items: ${run.evidenceChain?.count ?? 0}`); + if (run.evidenceChain?.lastCollectedAt) { + lines.push(`evidence_last_collected_at: ${run.evidenceChain.lastCollectedAt}`); + } + if (run.evidenceChain?.byType && Object.keys(run.evidenceChain.byType).length > 0) { + lines.push('evidence_by_type:'); + for (const [type, count] of Object.entries(run.evidenceChain.byType)) { + lines.push(` - ${type}: ${count}`); + } + } + lines.push(''); + } if (run.context && Object.keys(run.context).length > 0) { lines.push('## Context'); lines.push(''); @@ -588,6 +923,211 @@ function hydrateRun(run: DispatchRun): DispatchRun { }; } +function hydrateRunWithRuntimeMetadata(workspacePath: string, run: DispatchRun): DispatchRun { + const base = hydrateRun(run); + const trail = listDispatchRunAuditEvents(workspacePath, run.id); + const evidenceCount = trail + .filter((entry) => entry.kind === 'run-evidence-collected') + .reduce((total, entry) => { + const items = Array.isArray(entry.data.items) ? entry.data.items : []; + return total + items.length; + }, 0); + const byType: Record = {}; + for (const item of listRunEvidenceFromTrail(trail)) { + byType[item.type] = (byType[item.type] ?? 0) + 1; + } + return { + ...base, + audit: { + eventCount: trail.length, + headHash: trail[trail.length - 1]?.hash, + }, + evidenceChain: { + count: evidenceCount, + byType, + lastCollectedAt: trail.filter((entry) => entry.kind === 'run-evidence-collected').at(-1)?.ts, + }, + }; +} + +function listRunEvidenceFromTrail(trail: DispatchRunAuditEvent[]): DispatchRunEvidenceItem[] { + const items: DispatchRunEvidenceItem[] = []; + for (const entry of trail) { + if (entry.kind !== 'run-evidence-collected') continue; + const rawItems = Array.isArray(entry.data.items) ? entry.data.items : []; + for (const item of rawItems) { + if (!item || typeof item !== 'object') continue; + items.push(item as DispatchRunEvidenceItem); + } + } + return items; +} + +async function attemptSelfAssembly( + workspacePath: string, + run: DispatchRun, + input: DispatchExecuteInput, +): Promise<{ + ok: boolean; + logs: DispatchAdapterLogEntry[]; + error?: string; +}> { + const now = new Date().toISOString(); + const agentName = input.selfAssemblyAgent + ?? readOptionalString(run.context?.self_assembly_agent) + ?? readOptionalString(input.selfAssemblyOptions?.agentName) + ?? input.actor; + const rawOptions = isRecord(run.context?.self_assembly_options) + ? run.context?.self_assembly_options + : undefined; + const mergedOptions = { + ...normalizeSelfAssemblyOptions(rawOptions), + ...normalizeSelfAssemblyOptions(input.selfAssemblyOptions), + }; + try { + const module = await import('./agent-self-assembly.js'); + const result = module.assembleAgent(workspacePath, agentName, { + ...mergedOptions, + }); + return { + ok: true, + logs: [ + { + ts: now, + level: 'info', + message: `Self-assembly dispatched agent "${result.agentName}" before run execution.`, + }, + ...(result.claimedThread + ? [{ + ts: now, + level: 'info' as const, + message: `Self-assembly claimed ${result.claimedThread.path}.`, + }] + : []), + ...(result.warnings.length > 0 + ? result.warnings.map((warning) => ({ + ts: now, + level: 'warn' as const, + message: `Self-assembly warning: ${warning}`, + })) + : []), + ], + }; + } catch (error) { + return { + ok: false, + logs: [{ + ts: now, + level: 'error', + message: `Self-assembly failed: ${errorMessage(error)}`, + }], + error: `Self-assembly failed: ${errorMessage(error)}`, + }; + } +} + +function normalizeSelfAssemblyOptions( + value: Record | undefined, +): SelfAssemblyDispatchOptions { + if (!value) return {}; + const normalized: SelfAssemblyDispatchOptions = {}; + const credentialToken = readOptionalString(value.credentialToken); + const bootstrapToken = readOptionalString(value.bootstrapToken); + const role = readOptionalString(value.role); + const registerActor = readOptionalString(value.registerActor); + const recoveryActor = readOptionalString(value.recoveryActor); + const spaceRef = readOptionalString(value.spaceRef); + const recoveryLimit = readOptionalNumber(value.recoveryLimit); + const leaseTtlMinutes = readOptionalNumber(value.leaseTtlMinutes); + if (credentialToken) normalized.credentialToken = credentialToken; + if (bootstrapToken) normalized.bootstrapToken = bootstrapToken; + if (role) normalized.role = role; + if (registerActor) normalized.registerActor = registerActor; + if (typeof value.recoverStaleClaims === 'boolean') normalized.recoverStaleClaims = value.recoverStaleClaims; + if (recoveryActor) normalized.recoveryActor = recoveryActor; + if (typeof recoveryLimit === 'number') normalized.recoveryLimit = Math.trunc(recoveryLimit); + if (typeof value.recoveryRequired === 'boolean') normalized.recoveryRequired = value.recoveryRequired; + if (spaceRef) normalized.spaceRef = spaceRef; + if (typeof leaseTtlMinutes === 'number') normalized.leaseTtlMinutes = Math.trunc(leaseTtlMinutes); + if (typeof value.createPlanStepIfMissing === 'boolean') { + normalized.createPlanStepIfMissing = value.createPlanStepIfMissing; + } + return normalized; +} + +interface SelfAssemblyDispatchOptions { + credentialToken?: string; + bootstrapToken?: string; + role?: string; + registerActor?: string; + recoverStaleClaims?: boolean; + recoveryActor?: string; + recoveryLimit?: number; + recoveryRequired?: boolean; + spaceRef?: string; + leaseTtlMinutes?: number; + createPlanStepIfMissing?: boolean; +} + +function normalizeDispatchMode(rawValue: unknown): 'direct' | 'self-assembly' | undefined { + const normalized = String(rawValue ?? '').trim().toLowerCase(); + if (normalized === 'direct' || normalized === 'self-assembly') { + return normalized; + } + return undefined; +} + +function normalizeExecutionTimeoutMs(value: unknown): number { + const numeric = readOptionalNumber(value); + if (numeric === undefined || !Number.isFinite(numeric) || numeric <= 0) { + return DEFAULT_EXECUTE_TIMEOUT_MS; + } + return Math.trunc(Math.min(60 * 60_000, Math.max(1_000, numeric))); +} + +async function withExecutionTimeout( + promise: Promise, + timeoutMs: number, + runId: string, +): Promise { + let timeoutHandle: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise((_resolve, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`Dispatch execution timed out after ${timeoutMs}ms for run ${runId}.`)); + }, timeoutMs); + }), + ]); + } finally { + if (timeoutHandle) clearTimeout(timeoutHandle); + } +} + +function readOptionalString(value: unknown): string | undefined { + if (typeof value !== 'string') return undefined; + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; +} + +function readOptionalNumber(value: unknown): number | undefined { + if (typeof value === 'number' && Number.isFinite(value)) return value; + if (typeof value === 'string' && value.trim().length > 0) { + const parsed = Number(value); + if (Number.isFinite(parsed)) return parsed; + } + return undefined; +} + +function isRecord(value: unknown): value is Record { + return !!value && typeof value === 'object' && !Array.isArray(value); +} + +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + function normalizeLeaseMinutes(value: unknown): number { if (typeof value === 'number' && Number.isFinite(value) && value > 0) { return Math.trunc(value); diff --git a/packages/kernel/src/trigger-engine.ts b/packages/kernel/src/trigger-engine.ts index a23db3a..b54d7b6 100644 --- a/packages/kernel/src/trigger-engine.ts +++ b/packages/kernel/src/trigger-engine.ts @@ -154,6 +154,9 @@ export interface StartTriggerEngineOptions { intervalSeconds?: number; maxCycles?: number; logger?: (line: string) => void; + executeRuns?: boolean; + execution?: Omit; + retryFailedRuns?: boolean; } export interface TriggerDashboardItem { @@ -196,6 +199,28 @@ export interface AddSynthesisTriggerResult { trigger: PrimitiveInstance; } +export interface TriggerRunExecutionResult { + runId: string; + triggerPath?: string; + status: DispatchRun['status']; + retriedFromRunId?: string; + error?: string; +} + +export interface TriggerRunEvidenceLoopResult { + cycle: TriggerEngineCycleResult; + executedRuns: TriggerRunExecutionResult[]; + succeeded: number; + failed: number; + cancelled: number; + skipped: number; +} + +export interface TriggerRunEvidenceLoopOptions extends TriggerEngineCycleOptions { + execution?: Omit; + retryFailedRuns?: boolean; +} + export function triggerStatePath(workspacePath: string): string { return path.join(workspacePath, TRIGGER_STATE_FILE); } @@ -394,6 +419,83 @@ export function runTriggerEngineCycle( }; } +export async function runTriggerRunEvidenceLoop( + workspacePath: string, + options: TriggerRunEvidenceLoopOptions = {}, +): Promise { + const cycle = runTriggerEngineCycle(workspacePath, options); + const actor = options.actor ?? 'system'; + const triggerState = loadTriggerState(workspacePath); + const targetRuns = new Map(); + for (const triggerResult of cycle.triggers) { + if (!triggerResult.fired || triggerResult.actionType !== 'dispatch-run') continue; + const runtime = triggerState.triggers[triggerResult.triggerPath]; + const runId = typeof runtime?.lastResult?.run_id === 'string' + ? String(runtime.lastResult.run_id) + : undefined; + if (!runId) continue; + targetRuns.set(runId, triggerResult.triggerPath); + } + + const executedRuns: TriggerRunExecutionResult[] = []; + for (const [runId, triggerPath] of targetRuns) { + try { + const run = dispatch.status(workspacePath, runId); + if ((run.status === 'failed' || run.status === 'cancelled') && options.retryFailedRuns) { + const retried = await dispatch.retryRun(workspacePath, run.id, { + actor, + execute: true, + ...(options.execution ?? {}), + }); + executedRuns.push({ + runId: retried.id, + triggerPath, + status: retried.status, + retriedFromRunId: run.id, + }); + continue; + } + if (run.status === 'queued' || run.status === 'running') { + const executed = await dispatch.executeRun(workspacePath, run.id, { + actor, + ...(options.execution ?? {}), + }); + executedRuns.push({ + runId: executed.id, + triggerPath, + status: executed.status, + }); + continue; + } + executedRuns.push({ + runId: run.id, + triggerPath, + status: run.status, + }); + } catch (error) { + executedRuns.push({ + runId, + triggerPath, + status: 'failed', + error: errorMessage(error), + }); + } + } + + return { + cycle, + executedRuns, + succeeded: executedRuns.filter((entry) => entry.status === 'succeeded').length, + failed: executedRuns.filter((entry) => entry.status === 'failed').length, + cancelled: executedRuns.filter((entry) => entry.status === 'cancelled').length, + skipped: executedRuns.filter((entry) => + entry.status !== 'succeeded' + && entry.status !== 'failed' + && entry.status !== 'cancelled') + .length, + }; +} + export async function startTriggerEngine( workspacePath: string, options: StartTriggerEngineOptions = {}, @@ -406,10 +508,17 @@ export async function startTriggerEngine( let completedCycles = 0; while (options.maxCycles === undefined || completedCycles < options.maxCycles) { - const cycleResult = runTriggerEngineCycle(workspacePath, { - actor, - intervalSeconds, - }); + const cycleResult = options.executeRuns + ? (await runTriggerRunEvidenceLoop(workspacePath, { + actor, + intervalSeconds, + execution: options.execution, + retryFailedRuns: options.retryFailedRuns, + })).cycle + : runTriggerEngineCycle(workspacePath, { + actor, + intervalSeconds, + }); logger( `[${cycleResult.cycleAt}] cycle=${completedCycles + 1} evaluated=${cycleResult.evaluated} fired=${cycleResult.fired} errors=${cycleResult.errors}`, ); diff --git a/packages/kernel/src/trigger-run-evidence-loop.test.ts b/packages/kernel/src/trigger-run-evidence-loop.test.ts new file mode 100644 index 0000000..5193c11 --- /dev/null +++ b/packages/kernel/src/trigger-run-evidence-loop.test.ts @@ -0,0 +1,131 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import { loadRegistry, saveRegistry } from './registry.js'; +import * as dispatch from './dispatch.js'; +import * as store from './store.js'; +import * as thread from './thread.js'; +import { + runTriggerRunEvidenceLoop, +} from './trigger-engine.js'; +import { fireTriggerAndExecute } from './trigger.js'; + +let workspacePath: string; + +beforeEach(() => { + workspacePath = fs.mkdtempSync(path.join(os.tmpdir(), 'wg-trigger-run-loop-')); + const registry = loadRegistry(workspacePath); + saveRegistry(workspacePath, registry); +}); + +afterEach(() => { + fs.rmSync(workspacePath, { recursive: true, force: true }); +}); + +describe('trigger -> run -> evidence loop', () => { + it('executes dispatch runs for all trigger condition types', async () => { + const command = `"${process.execPath}" -e "console.log('loop-ok'); console.log('tests: 2 passed, 0 failed'); console.log('https://github.com/versatly/workgraph/pull/5150');"`; + const cronTrigger = createDispatchTrigger('Cron dispatch', { + type: 'cron', + expression: '* * * * *', + }, command); + const eventTrigger = createDispatchTrigger('Event dispatch', { + type: 'event', + event: 'thread-complete', + }, command); + const fileTrigger = createDispatchTrigger('File dispatch', { + type: 'file-watch', + glob: 'facts/**/*.md', + }, command); + const threadCompleteTrigger = createDispatchTrigger('Thread-complete dispatch', { + type: 'thread-complete', + }, command); + + await runTriggerRunEvidenceLoop(workspacePath, { + actor: 'system', + now: new Date('2026-03-01T00:00:00.000Z'), + execution: { timeoutMs: 10_000 }, + }); + + const completedThread = thread.createThread(workspacePath, 'Trigger source', 'Complete this thread', 'agent-trigger'); + thread.claim(workspacePath, completedThread.path, 'agent-trigger'); + thread.done( + workspacePath, + completedThread.path, + 'agent-trigger', + 'Completed https://github.com/versatly/workgraph/pull/500', + ); + store.create(workspacePath, 'fact', { + title: 'Changed fact', + subject: 'system', + predicate: 'state', + object: 'updated', + tags: ['ops'], + }, '# Fact\n', 'agent-trigger', { pathOverride: 'facts/trigger-change.md' }); + + const second = await runTriggerRunEvidenceLoop(workspacePath, { + actor: 'system', + now: new Date('2026-03-01T00:01:00.000Z'), + execution: { timeoutMs: 10_000 }, + }); + expect(second.executedRuns.length).toBeGreaterThanOrEqual(3); + expect(second.failed).toBe(0); + + const triggeredRuns = dispatch.listRuns(workspacePath) + .filter((run) => typeof run.context?.trigger_path === 'string'); + const runsByTriggerPath = new Map(); + for (const run of triggeredRuns) { + const triggerPath = String(run.context?.trigger_path); + const bucket = runsByTriggerPath.get(triggerPath) ?? []; + bucket.push(run); + runsByTriggerPath.set(triggerPath, bucket); + } + expect(runsByTriggerPath.get(cronTrigger.path)?.some((run) => run.status === 'succeeded')).toBe(true); + expect(runsByTriggerPath.get(eventTrigger.path)?.some((run) => run.status === 'succeeded')).toBe(true); + expect(runsByTriggerPath.get(fileTrigger.path)?.some((run) => run.status === 'succeeded')).toBe(true); + expect(runsByTriggerPath.get(threadCompleteTrigger.path)?.some((run) => run.status === 'succeeded')).toBe(true); + }); + + it('supports manual trigger fire -> execute flow', async () => { + const triggerPrimitive = createDispatchTrigger('Manual dispatch', { + type: 'event', + event: 'manual', + }, `"${process.execPath}" -e "console.log('manual-run');"`); + + const result = await fireTriggerAndExecute(workspacePath, triggerPrimitive.path, { + actor: 'agent-manual', + eventKey: 'manual-evt-1', + adapter: 'shell-worker', + execute: true, + executeInput: { + timeoutMs: 10_000, + }, + }); + + expect(result.executed).toBe(true); + expect(result.run.status).toBe('succeeded'); + expect((result.run.evidenceChain?.count ?? 0) > 0).toBe(true); + }); +}); + +function createDispatchTrigger( + title: string, + condition: Record, + shellCommand: string, +) { + return store.create(workspacePath, 'trigger', { + title, + status: 'active', + condition, + action: { + type: 'dispatch-run', + objective: `${title} objective`, + adapter: 'shell-worker', + context: { + shell_command: shellCommand, + }, + }, + cooldown: 0, + }, '# Trigger\n', 'system'); +} diff --git a/packages/kernel/src/trigger.ts b/packages/kernel/src/trigger.ts index 6847673..46237ec 100644 --- a/packages/kernel/src/trigger.ts +++ b/packages/kernel/src/trigger.ts @@ -12,6 +12,7 @@ export interface FireTriggerOptions { actor: string; eventKey?: string; objective?: string; + adapter?: string; context?: Record; } @@ -21,6 +22,18 @@ export interface FireTriggerResult { idempotencyKey: string; } +export interface FireTriggerAndExecuteOptions extends FireTriggerOptions { + execute?: boolean; + retryFailed?: boolean; + executeInput?: Omit; + retryInput?: Omit; +} + +export interface FireTriggerAndExecuteResult extends FireTriggerResult { + executed: boolean; + retriedFromRunId?: string; +} + export function fireTrigger( workspacePath: string, triggerPath: string, @@ -42,6 +55,7 @@ export function fireTrigger( const run = dispatch.createRun(workspacePath, { actor: options.actor, + adapter: options.adapter, objective, context: { trigger_path: triggerPath, @@ -65,6 +79,52 @@ export function fireTrigger( }; } +export async function fireTriggerAndExecute( + workspacePath: string, + triggerPath: string, + options: FireTriggerAndExecuteOptions, +): Promise { + const fired = fireTrigger(workspacePath, triggerPath, options); + if (options.execute === false) { + return { + ...fired, + executed: false, + }; + } + + if (fired.run.status === 'failed' && options.retryFailed) { + const retried = await dispatch.retryRun(workspacePath, fired.run.id, { + actor: options.actor, + ...(options.retryInput ?? {}), + }); + return { + triggerPath: fired.triggerPath, + idempotencyKey: fired.idempotencyKey, + run: retried, + executed: true, + retriedFromRunId: fired.run.id, + }; + } + + if (fired.run.status === 'queued' || fired.run.status === 'running') { + const executed = await dispatch.executeRun(workspacePath, fired.run.id, { + actor: options.actor, + ...(options.executeInput ?? {}), + }); + return { + triggerPath: fired.triggerPath, + idempotencyKey: fired.idempotencyKey, + run: executed, + executed: true, + }; + } + + return { + ...fired, + executed: false, + }; +} + function buildIdempotencyKey(triggerPath: string, eventSeed: string, objective: string): string { return createHash('sha256') .update(`${triggerPath}:${eventSeed}:${objective}`) diff --git a/packages/kernel/src/types.ts b/packages/kernel/src/types.ts index 988e0c6..1ed6f81 100644 --- a/packages/kernel/src/types.ts +++ b/packages/kernel/src/types.ts @@ -369,6 +369,58 @@ export interface PolicyRegistry { export type RunStatus = 'queued' | 'running' | 'succeeded' | 'failed' | 'cancelled'; +export type DispatchRunEvidenceType = + | 'stdout' + | 'stderr' + | 'log' + | 'file-change' + | 'test-result' + | 'pr-url' + | 'url' + | 'attachment' + | 'thread-ref' + | 'reply-ref' + | 'metric' + | 'error'; + +export interface DispatchRunEvidenceItem { + id: string; + runId: string; + ts: string; + type: DispatchRunEvidenceType; + source: 'adapter-output' | 'adapter-error' | 'adapter-log' | 'adapter-metric' | 'git' | 'derived'; + value: string; + metadata?: Record; +} + +export type DispatchRunAuditEventKind = + | 'run-created' + | 'run-idempotency-hit' + | 'run-status-changed' + | 'run-marked' + | 'run-followup' + | 'run-heartbeat' + | 'run-logs-appended' + | 'run-execution-started' + | 'run-execution-finished' + | 'run-execution-timeout' + | 'run-execution-error' + | 'run-evidence-collected' + | 'run-retried' + | 'run-handoff'; + +export interface DispatchRunAuditEvent { + id: string; + runId: string; + seq: number; + ts: string; + actor: string; + kind: DispatchRunAuditEventKind; + data: Record; + prevHash?: string; + hash: string; +} + export interface DispatchRun { id: string; createdAt: string; @@ -394,4 +446,13 @@ export interface DispatchRun { level: 'info' | 'warn' | 'error'; message: string; }>; + audit?: { + eventCount: number; + headHash?: string; + }; + evidenceChain?: { + count: number; + byType: Record; + lastCollectedAt?: string; + }; }